设为首页 - 加入收藏 ASP站长网(Aspzz.Cn)- 科技、建站、经验、云计算、5G、大数据,站长网!
热搜: 创业者 手机 数据
当前位置: 首页 > 站长学院 > PHP教程 > 正文

php-beanstalkd消息队列类实例分享(2)

发布时间:2021-01-27 16:34 所属栏目:121 来源:网络整理
导读:/** Pushes an error message to the logger,when one is configured. @param string $message The error message. @return void */ protected function _error($message) { if ($this-_config['logger']) { $this-

/**

  • Pushes an error message to the logger,when one is configured.
  • @param string $message The error message.
  • @return void
    */
    protected function _error($message) {
    if ($this->_config['logger']) {
    $this->_config['logger']->error($message);
    }
    }

public function errors()
{
return $this->_config['logger'];
}
/**

  • Writes a packet to the socket. Prior to writing to the socket will
  • check for availability of the connection.
  • @param string $data
  • @return integer|boolean number of written bytes or false on error.
    */
    protected function _write($data) {
    if (!$this->connected) {
    $message = 'No connecting found while writing data to socket.';
    throw new RuntimeException($message);
    }
$data .= "\r\n";
return fwrite($this->_connection,$data,strlen($data));

}

/**

  • Reads a packet from the socket. Prior to reading from the socket

  • will check for availability of the connection.

  • @param integer $length Number of bytes to read.

  • @return string|boolean Data or false on error.
    */
    protected function _read($length = null) {
    if (!$this->connected) {
    $message = 'No connection found while reading data from socket.';
    throw new RuntimeException($message);
    }
    if ($length) {
    if (feof($this->_connection)) {
    return false;
    }
    $data = stream_get_contents($this->_connection,$length + 2);
    $meta = stream_get_meta_data($this->_connection);

    if ($meta['timed_out']) {
    $message = 'Connection timed out while reading data from socket.';
    throw new RuntimeException($message);
    }
    $packet = rtrim($data,"\r\n");
    } else {
    $packet = stream_get_line($this->_connection,16384,"\r\n");
    }
    return $packet;
    }

/ Producer Commands /

/**

  • The put command is for any process that wants to insert a job into the queue.
  • @param integer $pri Jobs with smaller priority values will be scheduled
  • before jobs with larger priorities. The most urgent priority is
  • 0; the least urgent priority is 4294967295.
  • @param integer $delay Seconds to wait before putting the job in the
  • ready queue. The job will be in the "delayed" state during this time.
  • @param integer $ttr Time to run - Number of seconds to allow a worker to
  • run this job. The minimum ttr is 1.
  • @param string $data The job body.
  • @return integer|boolean false on error otherwise an integer indicating
  • the job id.
    */
    public function put($pri,$delay,$ttr,$data) {
    $this->_write(sprintf("put %d %d %d %d\r\n%s",$pri,strlen($data),$data));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'INSERTED':
  case 'BURIED':
    return (integer) strtok(' '); // job id
  case 'EXPECTED_CRLF':
  case 'JOB_TOO_BIG':
  default:
    $this->_error($status);
    return false;
}

}

/**

  • The use command is for producers. Subsequent put commands will put
  • jobs into the tube specified by this command. If no use command has
  • been issued,jobs will be put into the tube named default.
  • @param string $tube A name at most 200 bytes. It specifies the tube to
  • use. If the tube does not exist,it will be created.
  • @return string|boolean false on error otherwise the name of the tube.
    */
    public function useTube($tube) {
    $this->_write(sprintf('use %s',$tube));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'USING':
    return strtok(' ');
  default:
    $this->_error($status);
    return false;
}

}

/**

  • Pause a tube delaying any new job in it being reserved for a given time.
  • @param string $tube The name of the tube to pause.
  • @param integer $delay Number of seconds to wait before reserving any more
  • jobs from the queue.
  • @return boolean false on error otherwise true.
    */
    public function pauseTube($tube,$delay) {
    $this->_write(sprintf('pause-tube %s %d',$tube,$delay));
    $status = strtok($this->_read(),' ');
switch ($status) {
  case 'PAUSED':
    return true;
  case 'NOT_FOUND':
  default:
    $this->_error($status);
    return false;
}

}

/ Worker Commands /

(编辑:ASP站长网)

网友评论
推荐文章
    热点阅读