1. 主页 > 企业网站运营 > 网站SEO优化

PHP模拟supervisor的进程管理

高价值的内容是做好企业营销推广的前提,如果你对企业内容营销感兴趣的话,不妨看看MarketUP近期整理的《2023内容营销获客实战白皮书》,希望能给大家有一些实质性的帮助,预计发布400份!送完下架,赶快领取!感兴趣的朋友可以点击链接即可下载阅读:《2023内容营销获客实战白皮书》

PHP模拟supervisor的进程管理(图1)

前言

模拟supervisor进程管理DEMO(简易实现)

截图:

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

主进程代码:Process.php

<?php
require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{
/** 
* 待启动的消费者数组
*/
protected $consumers = array();
protected $childPids = array();
const PPID_FILE = __DIR__ . '/process';
protected $serializerConsumer;
public function __construct()
{
$this->consumers = $this->getConsumers();
}
// 这里是个DEMO,实际可以用读取配置文件的方式。
public function getConsumers()
{
$consumer = new Consumer([
'program' => 'test',
'command' => '/usr/bin/php test.php',
'directory' => __DIR__,
'logfile' => __DIR__ . '/test.log',
'uniqid' => uniqid(),
'auto_restart' => false,
]);
return [
$consumer->uniqid => $consumer,
];
}
public function run()
{
if (empty($this->consumers)) {
// consumer empty
return;
}
if ($this->_notifyMaster()) {
// master alive
return;
}
$pid = pcntl_fork();
if ($pid < 0) {
exit;
} elseif ($pid > 0) {
exit;
}
if (!posix_setsid()) {
exit;
}
$stream = new StreamConnection('tcp://0.0.0.0:7865');
@cli_set_process_title('AMQP Master Process');
// 将主进程ID写入文件
file_put_contents(self::PPID_FILE, getmypid());
// master进程继续
while (true) {
$this->init();
pcntl_signal_dispatch();
$this->waitpid();
// 如果子进程被全部回收,则主进程退出
// if (empty($this->childPids)) {
//     $stream->close($stream->getSocket());
//     break;
// }
$stream->accept(function ($uniqid, $action) {
$this->handle($uniqid, $action);
return $this->display();
});
}
}
protected function init()
{
foreach ($this->consumers as &$c) {
switch ($c->state) {
case Consumer::RUNNING:
case Consumer::STOP:
break;
case Consumer::NOMINAL:
case Consumer::STARTING:
$this->fork($c);
break;
case Consumer::STOPING:
if ($c->pid && posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
}
break;
case Consumer::RESTART:
if (empty($c->pid)) {
$this->fork($c);
break;
}
if (posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
$this->fork($c);
}
break;
default:
break;
}
}
}
protected function reset(Consumer $c, $state)
{
$c->pid = '';
$c->uptime = '';
$c->state = $state;
$c->process = null;
}
protected function waitpid()
{
foreach ($this->childPids as $uniqid => $pid) {
$result = pcntl_waitpid($pid, $status, WNOHANG);
if ($result == $pid || $result == -1) {
unset($this->childPids[$uniqid]);
$c = &$this->consumers[$uniqid];
$state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
$this->reset($c, $state);
}
}
}
/**
* 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
*/
private function _notifyMaster()
{
$ppid = file_get_contents(self::PPID_FILE );
$isAlive = $this->checkProcessAlive($ppid);
if (!$isAlive) return false;
return true;
}
public function checkProcessAlive($pid)
{
if (empty($pid)) return false;
$pidinfo = `ps co pid {$pid} | xargs`;
$pidinfo = trim($pidinfo);
$pattern = "/.*?PID.*?(\\d+).*?/";
preg_match($pattern, $pidinfo, $matches);
return empty($matches) ? false : ($matches[1] == $pid ? true : false);
}
/**
* fork一个新的子进程
*/
protected function fork(Consumer $c)
{
$descriptorspec = [2 => ['file', $c->logfile, 'a'],];
$process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
if ($process) {
$ret = proc_get_status($process);
if ($ret['running']) {
$c->state = Consumer::RUNNING;
$c->pid = $ret['pid'];
$c->process = $process;
$c->uptime = date('m-d H:i');
$this->childPids[$c->uniqid] = $ret['pid'];
} else {
$c->state = Consumer::EXITED;
proc_close($process);
}
} else {
$c->state = Consumer::ERROR;
}
return $c;
}
public function display()
{
$location = 'http://127.0.0.1:7865';
$basePath = Http::$basePath;
$scriptName = isset($_SERVER['SCRIPT_NAME']) &&
!empty($_SERVER['SCRIPT_NAME']) &&
$_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
if ($scriptName == '/index.html') {
return Http::status_301($location);
}
$sourcePath = $basePath . $scriptName;
if (!is_file($sourcePath)) {
return Http::status_404();
}
ob_start();
include $sourcePath;
$response = ob_get_contents();
ob_clean();
return Http::status_200($response);
}
public function handle($uniqid, $action)
{
if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
return;
}
switch ($action) {
case 'refresh':
break;
case 'restartall':
$this->killall(true);
break;
case 'stopall':
$this->killall();
break;
case 'stop':
$c = &$this->consumers[$uniqid];
if ($c->state != Consumer::RUNNING) break;
$c->state = Consumer::STOPING;
break;
case 'start':
$c = &$this->consumers[$uniqid];
if ($c->state == Consumer::RUNNING) break;
$c->state = Consumer::STARTING;
break;
case 'restart':
$c = &$this->consumers[$uniqid];
$c->state = Consumer::RESTART;
break;
case 'copy':
$c = $this->consumers[$uniqid];
$newC = clone $c;
$newC->uniqid = uniqid('C');
$newC->state = Consumer::NOMINAL;
$newC->pid = '';
$this->consumers[$newC->uniqid] = $newC;
break;
default:
break;
}
}
protected function killall($restart = false)
{
foreach ($this->consumers as &$c) {
$c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
}
}}$cli = new Process();$cli->run();

Consumer消费者对象

<?php
require_once __DIR__ . '/Baseobject.php';class Consumer extends BaseObject{
/** 开启多少个消费者 */
public $numprocs = 1;
/** 当前配置的唯一标志 */
public $program;
/** 执行的命令 */
public $command;
/** 当前工作的目录 */
public $directory;
/** 通过 $qos $queueName $duplicate 生成的 $queue */
public $queue;
/** 程序执行日志记录 */
public $logfile = '';
/** 消费进程的唯一ID */
public $uniqid;
/** 进程IDpid */
public $pid;
/** 进程状态 */
public $state = self::NOMINAL;
/** 自启动 */
public $auto_restart = false;
public $process;
/** 启动时间 */
public $uptime;
const RUNNING = 'running';
const STOP = 'stoped';
const NOMINAL = 'nominal';
const RESTART = 'restart';
const STOPING = 'stoping';
const STARTING = 'stating';
const ERROR = 'error';
const BLOCKED = 'blocked';
const EXITED = 'exited';
const FATEL = 'fatel';}

stream相关代码:StreamConnection.php

<?php
class StreamConnection{
protected $socket;
protected $timeout = 2; //s
protected $client;
public function __construct($host)
{
$this->socket = $this->connect($host);
}
public function connect($host)
{
$socket = stream_socket_server($host, $errno, $errstr);
if (!$socket) {
exit('stream error');
}
stream_set_timeout($socket, $this->timeout);
stream_set_chunk_size($socket, 1024);
stream_set_blocking($socket, false);
$this->client = [$socket];
return $socket;
}
public function accept(Closure $callback)
{
$read = $this->client;
if (stream_select($read, $write, $except, 1) < 1) return;
if (in_array($this->socket, $read)) {
$cs = stream_socket_accept($this->socket);
$this->client[] = $cs;
}
foreach ($read as $s) {
if ($s == $this->socket) continue;
$header = fread($s, 1024);
if (empty($header)) {
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
continue;
}
Http::parse_http($header);
$uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
$action = isset($_GET['action']) ? $_GET['action'] : '';
$response = $callback($uniqid, $action);
$this->write($s, $response);
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
}
}
public function write($socket, $response)
{
$ret = fwrite($socket, $response, strlen($response));
}
public function close($socket)
{
$flag = fclose($socket);
}
public function getSocket()
{
return $this->socket;
}}

Http响应代码:Http.php

<?php
class Http{
public static $basePath = __DIR__ . '/views';
public static $max_age = 120; //秒
/*
*  函数:     parse_http
*  描述:     解析http协议
*/
public static function parse_http($http)
{
// 初始化
$_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
$GLOBALS['HTTP_RAW_POST_DATA'] = '';
// 需要设置的变量名
$_SERVER = array(
'QUERY_STRING' => '',
'REQUEST_METHOD' => '',
'REQUEST_URI' => '',
'SERVER_PROTOCOL' => '',
'SERVER_SOFTWARE' => '',
'SERVER_NAME' => '',
'HTTP_HOST' => '',
'HTTP_USER_AGENT' => '',
'HTTP_ACCEPT' => '',
'HTTP_ACCEPT_LANGUAGE' => '',
'HTTP_ACCEPT_ENCODING' => '',
'HTTP_COOKIE' => '',
'HTTP_CONNECTION' => '',
'REMOTE_ADDR' => '',
'REMOTE_PORT' => '0',
'SCRIPT_NAME' => '',
'HTTP_REFERER' => '',
'CONTENT_TYPE' => '',
'HTTP_IF_NONE_MATCH' => '',
);
// 将header分割成数组
list($http_header, $http_body) = explode("\\r\\n\\r\\n", $http, 2);
$header_data = explode("\\r\\n", $http_header);
list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);
unset($header_data[0]);
foreach ($header_data as $content) {
// \\r\\n\\r\\n
if (empty($content)) {
continue;
}
list($key, $value) = explode(':', $content, 2);
$key = strtolower($key);
$value = trim($value);
switch ($key) {
case 'host':
$_SERVER['HTTP_HOST'] = $value;
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
$_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
case 'cookie':
$_SERVER['HTTP_COOKIE'] = $value;
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
case 'user-agent':
$_SERVER['HTTP_USER_AGENT'] = $value;
break;
case 'accept':
$_SERVER['HTTP_ACCEPT'] = $value;
break;
case 'accept-language':
$_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
break;
case 'accept-encoding':
$_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
break;
case 'connection':
$_SERVER['HTTP_CONNECTION'] = $value;
break;
case 'referer':
$_SERVER['HTTP_REFERER'] = $value;
break;
case 'if-modified-since':
$_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
break;
case 'if-none-match':
$_SERVER['HTTP_IF_NONE_MATCH'] = $value;
break;
case 'content-type':
if (!preg_match('/boundary="?(\\S+)"?/', $value, $match)) {
$_SERVER['CONTENT_TYPE'] = $value;
} else {
$_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
$http_post_boundary = '--' . $match[1];
}
break;
}
}
// script_name
$_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);
// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
// $GET
parse_str($_SERVER['QUERY_STRING'], $_GET);
} else {
$_SERVER['QUERY_STRING'] = '';
}
// REQUEST
$_REQUEST = array_merge($_GET, $_POST);
return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
}
public static function status_404()
{
return <<<EOFHTTP/1.1 404 OK
content-type: text/htmlEOF;
}
public static function status_301($location)
{
return <<<EOFHTTP/1.1 301 Moved Permanently
Content-Length: 0
Content-Type: text/plain
Location: $locationCache-Control: no-cacheEOF;
}
public static function status_304()
{
return <<<EOFHTTP/1.1 304 Not Modified
Content-Length: 0EOF;
}
public static function status_200($response)
{
$contentType = $_SERVER['CONTENT_TYPE'];
$length = strlen($response);
$header = '';
if ($contentType)
$header = 'Cache-Control: max-age=180';
return <<<EOFHTTP/1.1 200 OK
Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
}}

待执行的脚本:test.php

<?php
while(true) {
file_put_contents(__DIR__  .  '/test.log', date('Y-m-d H:i:s'));
sleep(1);}

在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/

更多编程相关知识,请访问:编程教学!!

关于PHP模拟supervisor的进程管理的文章就分享到这,如果对你有帮助欢迎继续关注我们哦

以上就是今天分享的全部内容,希望能够对广大企业营销人员有一些营销方面的启发。作为国内知名的营销自动化平台,MarketUP通过完善的企业营销自动化系统,帮助企业实现有效和有意义的企业营销工作,更好地满足您每个目标受众端到端的需求,为您的渠道带来更多理想的潜在客户并留存孵化。如果您对我们的营销自动化产品或功能感兴趣,欢迎点击【这里】进行演示申请,我们将有专业的团队为您提供服务。

本文由MarketUP营销自动化博客发布,不代表MarketUP立场,转载联系作者并注明出处:https://www.marketup.cn/marketupblog/jianzhan/seo/18446.html

联系我们

手机号:19951984030

微信号:marketup01

工作日:8:30-18:00,节假日休息