Install psalm as dev, sync scripts updates

This commit is contained in:
Clemens Schwaighofer
2023-03-09 16:27:10 +09:00
parent 6bec59e387
commit feba79a2e8
2099 changed files with 283333 additions and 32 deletions

View File

@@ -0,0 +1,135 @@
name: Continuous Integration
on:
push: null
pull_request:
branches:
- master
jobs:
unit_tests:
strategy:
matrix:
include:
- operating-system: 'ubuntu-latest'
php-version: '7.1'
- operating-system: 'ubuntu-latest'
php-version: '7.2'
- operating-system: 'ubuntu-latest'
php-version: '7.3'
- operating-system: 'ubuntu-latest'
php-version: '7.4'
- operating-system: 'ubuntu-latest'
php-version: '8.0'
composer-flags: '--ignore-platform-req=php'
- operating-system: 'windows-latest'
php-version: '8.0'
composer-flags: '--ignore-platform-req=php'
- operating-system: 'macos-latest'
php-version: '8.0'
composer-flags: '--ignore-platform-req=php'
name: PHP ${{ matrix.php-version }} on ${{ matrix.operating-system }}
runs-on: ${{ matrix.operating-system }}
steps:
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
- name: Use LF line ends
run: |
git config --global core.autocrlf false
git config --global core.eol lf
- name: Checkout code
uses: actions/checkout@v2
- name: Get Composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-${{ matrix.composer-flags }}
restore-keys: |
composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-
composer-${{ runner.os }}-${{ matrix.php-version }}-
- name: Install dependencies
uses: nick-invision/retry@v2
with:
timeout_minutes: 5
max_attempts: 5
retry_wait_seconds: 30
command: |
php_version=$(php -v)
composer update --optimize-autoloader --no-interaction --no-progress ${{ matrix.composer-flags }}
composer info -D
- name: Run unit tests
run: vendor/bin/phpunit --verbose
coding_standards:
strategy:
matrix:
include:
- operating-system: 'ubuntu-latest'
php-version: '8.0'
composer-flags: '--ignore-platform-req=php'
name: Coding standards
runs-on: ${{ matrix.operating-system }}
steps:
- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
- name: Use LF line ends
run: |
git config --global core.autocrlf false
git config --global core.eol lf
- name: Checkout code
uses: actions/checkout@v2
- name: Get Composer cache directory
id: composer-cache
run: echo "::set-output name=dir::$(composer config cache-dir)"
- name: Cache dependencies
uses: actions/cache@v2
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-${{ matrix.composer-flags }}
restore-keys: |
composer-${{ runner.os }}-${{ matrix.php-version }}-${{ hashFiles('**/composer.*') }}-
composer-${{ runner.os }}-${{ matrix.php-version }}-
- name: Install dependencies
uses: nick-invision/retry@v2
with:
timeout_minutes: 5
max_attempts: 5
retry_wait_seconds: 30
command: |
php_version=$(php -v)
composer update --optimize-autoloader --no-interaction --no-progress ${{ matrix.composer-flags }}
composer info -D
- name: Run style fixer
env:
PHP_CS_FIXER_IGNORE_ENV: 1
run: vendor/bin/php-cs-fixer --diff --dry-run -v fix

22
vendor/amphp/byte-stream/LICENSE vendored Normal file
View File

@@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2016-2021 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

58
vendor/amphp/byte-stream/composer.json vendored Normal file
View File

@@ -0,0 +1,58 @@
{
"name": "amphp/byte-stream",
"homepage": "http://amphp.org/byte-stream",
"description": "A stream abstraction to make working with non-blocking I/O simple.",
"support": {
"issues": "https://github.com/amphp/byte-stream/issues",
"irc": "irc://irc.freenode.org/amphp"
},
"keywords": [
"stream",
"async",
"non-blocking",
"amp",
"amphp",
"io"
],
"license": "MIT",
"authors": [
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
}
],
"require": {
"php": ">=7.1",
"amphp/amp": "^2"
},
"require-dev": {
"amphp/phpunit-util": "^1.4",
"phpunit/phpunit": "^6 || ^7 || ^8",
"friendsofphp/php-cs-fixer": "^2.3",
"amphp/php-cs-fixer-config": "dev-master",
"psalm/phar": "^3.11.4",
"jetbrains/phpstorm-stubs": "^2019.3"
},
"autoload": {
"psr-4": {
"Amp\\ByteStream\\": "lib"
},
"files": [
"lib/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\ByteStream\\Test\\": "test"
}
},
"extra": {
"branch-alias": {
"dev-master": "1.x-dev"
}
}
}

View File

@@ -0,0 +1,65 @@
<?php
namespace Amp\ByteStream\Base64;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\StreamException;
use Amp\Promise;
use function Amp\call;
final class Base64DecodingInputStream implements InputStream
{
/** @var InputStream|null */
private $source;
/** @var string|null */
private $buffer = '';
public function __construct(InputStream $source)
{
$this->source = $source;
}
public function read(): Promise
{
return call(function () {
if ($this->source === null) {
throw new StreamException('Failed to read stream chunk due to invalid base64 data');
}
$chunk = yield $this->source->read();
if ($chunk === null) {
if ($this->buffer === null) {
return null;
}
$chunk = \base64_decode($this->buffer, true);
if ($chunk === false) {
$this->source = null;
$this->buffer = null;
throw new StreamException('Failed to read stream chunk due to invalid base64 data');
}
$this->buffer = null;
return $chunk;
}
$this->buffer .= $chunk;
$length = \strlen($this->buffer);
$chunk = \base64_decode(\substr($this->buffer, 0, $length - $length % 4), true);
if ($chunk === false) {
$this->source = null;
$this->buffer = null;
throw new StreamException('Failed to read stream chunk due to invalid base64 data');
}
$this->buffer = \substr($this->buffer, $length - $length % 4);
return $chunk;
});
}
}

View File

@@ -0,0 +1,55 @@
<?php
namespace Amp\ByteStream\Base64;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\StreamException;
use Amp\Failure;
use Amp\Promise;
final class Base64DecodingOutputStream implements OutputStream
{
/** @var OutputStream */
private $destination;
/** @var string */
private $buffer = '';
/** @var int */
private $offset = 0;
public function __construct(OutputStream $destination)
{
$this->destination = $destination;
}
public function write(string $data): Promise
{
$this->buffer .= $data;
$length = \strlen($this->buffer);
$chunk = \base64_decode(\substr($this->buffer, 0, $length - $length % 4), true);
if ($chunk === false) {
return new Failure(new StreamException('Invalid base64 near offset ' . $this->offset));
}
$this->offset += $length - $length % 4;
$this->buffer = \substr($this->buffer, $length - $length % 4);
return $this->destination->write($chunk);
}
public function end(string $finalData = ""): Promise
{
$this->offset += \strlen($this->buffer);
$chunk = \base64_decode($this->buffer . $finalData, true);
if ($chunk === false) {
return new Failure(new StreamException('Invalid base64 near offset ' . $this->offset));
}
$this->buffer = '';
return $this->destination->end($chunk);
}
}

View File

@@ -0,0 +1,46 @@
<?php
namespace Amp\ByteStream\Base64;
use Amp\ByteStream\InputStream;
use Amp\Promise;
use function Amp\call;
final class Base64EncodingInputStream implements InputStream
{
/** @var InputStream */
private $source;
/** @var string|null */
private $buffer = '';
public function __construct(InputStream $source)
{
$this->source = $source;
}
public function read(): Promise
{
return call(function () {
$chunk = yield $this->source->read();
if ($chunk === null) {
if ($this->buffer === null) {
return null;
}
$chunk = \base64_encode($this->buffer);
$this->buffer = null;
return $chunk;
}
$this->buffer .= $chunk;
$length = \strlen($this->buffer);
$chunk = \base64_encode(\substr($this->buffer, 0, $length - $length % 3));
$this->buffer = \substr($this->buffer, $length - $length % 3);
return $chunk;
});
}
}

View File

@@ -0,0 +1,39 @@
<?php
namespace Amp\ByteStream\Base64;
use Amp\ByteStream\OutputStream;
use Amp\Promise;
final class Base64EncodingOutputStream implements OutputStream
{
/** @var OutputStream */
private $destination;
/** @var string */
private $buffer = '';
public function __construct(OutputStream $destination)
{
$this->destination = $destination;
}
public function write(string $data): Promise
{
$this->buffer .= $data;
$length = \strlen($this->buffer);
$chunk = \base64_encode(\substr($this->buffer, 0, $length - $length % 3));
$this->buffer = \substr($this->buffer, $length - $length % 3);
return $this->destination->write($chunk);
}
public function end(string $finalData = ""): Promise
{
$chunk = \base64_encode($this->buffer . $finalData);
$this->buffer = '';
return $this->destination->end($chunk);
}
}

View File

@@ -0,0 +1,7 @@
<?php
namespace Amp\ByteStream;
final class ClosedException extends StreamException
{
}

View File

@@ -0,0 +1,39 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
use Amp\Success;
/**
* Input stream with a single already known data chunk.
*/
final class InMemoryStream implements InputStream
{
private $contents;
/**
* @param string|null $contents Data chunk or `null` for no data chunk.
*/
public function __construct(string $contents = null)
{
$this->contents = $contents;
}
/**
* Reads data from the stream.
*
* @return Promise<string|null> Resolves with the full contents or `null` if the stream has closed / already been consumed.
*/
public function read(): Promise
{
if ($this->contents === null) {
return new Success;
}
$promise = new Success($this->contents);
$this->contents = null;
return $promise;
}
}

View File

@@ -0,0 +1,38 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
/**
* An `InputStream` allows reading byte streams in chunks.
*
* **Example**
*
* ```php
* function readAll(InputStream $in): Promise {
* return Amp\call(function () use ($in) {
* $buffer = "";
*
* while (($chunk = yield $in->read()) !== null) {
* $buffer .= $chunk;
* }
*
* return $buffer;
* });
* }
* ```
*/
interface InputStream
{
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @psalm-return Promise<string|null>
*
* @throws PendingReadError Thrown if another read operation is still pending.
*/
public function read(): Promise;
}

View File

@@ -0,0 +1,52 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
final class InputStreamChain implements InputStream
{
/** @var InputStream[] */
private $streams;
/** @var bool */
private $reading = false;
public function __construct(InputStream ...$streams)
{
$this->streams = $streams;
}
/** @inheritDoc */
public function read(): Promise
{
if ($this->reading) {
throw new PendingReadError;
}
if (!$this->streams) {
return new Success(null);
}
return call(function () {
$this->reading = true;
try {
while ($this->streams) {
$chunk = yield $this->streams[0]->read();
if ($chunk === null) {
\array_shift($this->streams);
continue;
}
return $chunk;
}
return null;
} finally {
$this->reading = false;
}
});
}
}

View File

@@ -0,0 +1,70 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Iterator;
use Amp\Promise;
final class IteratorStream implements InputStream
{
/** @var Iterator<string> */
private $iterator;
/** @var \Throwable|null */
private $exception;
/** @var bool */
private $pending = false;
/**
* @psam-param Iterator<string> $iterator
*/
public function __construct(Iterator $iterator)
{
$this->iterator = $iterator;
}
/** @inheritdoc */
public function read(): Promise
{
if ($this->exception) {
return new Failure($this->exception);
}
if ($this->pending) {
throw new PendingReadError;
}
$this->pending = true;
/** @var Deferred<string|null> $deferred */
$deferred = new Deferred;
$this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) {
$this->pending = false;
if ($error) {
$this->exception = $error;
$deferred->fail($error);
} elseif ($hasNextElement) {
$chunk = $this->iterator->getCurrent();
if (!\is_string($chunk)) {
$this->exception = new StreamException(\sprintf(
"Unexpected iterator value of type '%s', expected string",
\is_object($chunk) ? \get_class($chunk) : \gettype($chunk)
));
$deferred->fail($this->exception);
return;
}
$deferred->resolve($chunk);
} else {
$deferred->resolve();
}
});
return $deferred->promise();
}
}

View File

@@ -0,0 +1,71 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
use function Amp\call;
final class LineReader
{
/** @var string */
private $delimiter;
/** @var bool */
private $lineMode;
/** @var string */
private $buffer = "";
/** @var InputStream */
private $source;
public function __construct(InputStream $inputStream, string $delimiter = null)
{
$this->source = $inputStream;
$this->delimiter = $delimiter === null ? "\n" : $delimiter;
$this->lineMode = $delimiter === null;
}
/**
* @return Promise<string|null>
*/
public function readLine(): Promise
{
return call(function () {
if (false !== \strpos($this->buffer, $this->delimiter)) {
list($line, $this->buffer) = \explode($this->delimiter, $this->buffer, 2);
return $this->lineMode ? \rtrim($line, "\r") : $line;
}
while (null !== $chunk = yield $this->source->read()) {
$this->buffer .= $chunk;
if (false !== \strpos($this->buffer, $this->delimiter)) {
list($line, $this->buffer) = \explode($this->delimiter, $this->buffer, 2);
return $this->lineMode ? \rtrim($line, "\r") : $line;
}
}
if ($this->buffer === "") {
return null;
}
$line = $this->buffer;
$this->buffer = "";
return $this->lineMode ? \rtrim($line, "\r") : $line;
});
}
public function getBuffer(): string
{
return $this->buffer;
}
/**
* @return void
*/
public function clearBuffer()
{
$this->buffer = "";
}
}

176
vendor/amphp/byte-stream/lib/Message.php vendored Normal file
View File

@@ -0,0 +1,176 @@
<?php
namespace Amp\ByteStream;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;
/**
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
* be buffered and accessed in its entirety by waiting for the promise to resolve.
*
* Other implementations may extend this class to add custom properties such as a `isBinary()` flag for WebSocket
* messages.
*
* Buffering Example:
*
* $stream = new Message($inputStream);
* $content = yield $stream;
*
* Streaming Example:
*
* $stream = new Message($inputStream);
*
* while (($chunk = yield $stream->read()) !== null) {
* // Immediately use $chunk, reducing memory consumption since the entire message is never buffered.
* }
*
* @deprecated Use Amp\ByteStream\Payload instead.
*/
class Message implements InputStream, Promise
{
/** @var InputStream */
private $source;
/** @var string */
private $buffer = "";
/** @var Deferred|null */
private $pendingRead;
/** @var Coroutine|null */
private $coroutine;
/** @var bool True if onResolve() has been called. */
private $buffering = false;
/** @var Deferred|null */
private $backpressure;
/** @var bool True if the iterator has completed. */
private $complete = false;
/** @var \Throwable|null Used to fail future reads on failure. */
private $error;
/**
* @param InputStream $source An iterator that only emits strings.
*/
public function __construct(InputStream $source)
{
$this->source = $source;
}
private function consume(): \Generator
{
while (($chunk = yield $this->source->read()) !== null) {
$buffer = $this->buffer .= $chunk;
if ($buffer === "") {
continue; // Do not succeed reads with empty string.
} elseif ($this->pendingRead) {
$deferred = $this->pendingRead;
$this->pendingRead = null;
$this->buffer = "";
$deferred->resolve($buffer);
$buffer = ""; // Destroy last emitted chunk to free memory.
} elseif (!$this->buffering) {
$buffer = ""; // Destroy last emitted chunk to free memory.
$this->backpressure = new Deferred;
yield $this->backpressure->promise();
}
}
$this->complete = true;
if ($this->pendingRead) {
$deferred = $this->pendingRead;
$this->pendingRead = null;
$deferred->resolve($this->buffer !== "" ? $this->buffer : null);
$this->buffer = "";
}
return $this->buffer;
}
/** @inheritdoc */
final public function read(): Promise
{
if ($this->pendingRead) {
throw new PendingReadError;
}
if ($this->coroutine === null) {
$this->coroutine = new Coroutine($this->consume());
$this->coroutine->onResolve(function ($error) {
if ($error) {
$this->error = $error;
}
if ($this->pendingRead) {
$deferred = $this->pendingRead;
$this->pendingRead = null;
$deferred->fail($error);
}
});
}
if ($this->error) {
return new Failure($this->error);
}
if ($this->buffer !== "") {
$buffer = $this->buffer;
$this->buffer = "";
if ($this->backpressure) {
$backpressure = $this->backpressure;
$this->backpressure = null;
$backpressure->resolve();
}
return new Success($buffer);
}
if ($this->complete) {
return new Success;
}
$this->pendingRead = new Deferred;
return $this->pendingRead->promise();
}
/** @inheritdoc */
final public function onResolve(callable $onResolved)
{
$this->buffering = true;
if ($this->coroutine === null) {
$this->coroutine = new Coroutine($this->consume());
}
if ($this->backpressure) {
$backpressure = $this->backpressure;
$this->backpressure = null;
$backpressure->resolve();
}
$this->coroutine->onResolve($onResolved);
}
/**
* Exposes the source input stream.
*
* This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with
* other promises.
*
* @return InputStream
*/
final public function getInputStream(): InputStream
{
return $this->source;
}
}

View File

@@ -0,0 +1,55 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Promise;
use Amp\Success;
class OutputBuffer implements OutputStream, Promise
{
/** @var Deferred */
private $deferred;
/** @var string */
private $contents = '';
/** @var bool */
private $closed = false;
public function __construct()
{
$this->deferred = new Deferred;
}
public function write(string $data): Promise
{
if ($this->closed) {
throw new ClosedException("The stream has already been closed.");
}
$this->contents .= $data;
return new Success(\strlen($data));
}
public function end(string $finalData = ""): Promise
{
if ($this->closed) {
throw new ClosedException("The stream has already been closed.");
}
$this->contents .= $finalData;
$this->closed = true;
$this->deferred->resolve($this->contents);
$this->contents = "";
return new Success(\strlen($finalData));
}
public function onResolve(callable $onResolved)
{
$this->deferred->promise()->onResolve($onResolved);
}
}

View File

@@ -0,0 +1,37 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
/**
* An `OutputStream` allows writing data in chunks. Writers can wait on the returned promises to feel the backpressure.
*/
interface OutputStream
{
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function write(string $data): Promise;
/**
* Marks the stream as no longer writable. Optionally writes a final data chunk before. Note that this is not the
* same as forcefully closing the stream. This method waits for all pending writes to complete before closing the
* stream. Socket streams implementing this interface should only close the writable side of the stream.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function end(string $finalData = ""): Promise;
}

View File

@@ -0,0 +1,92 @@
<?php
namespace Amp\ByteStream;
use Amp\Coroutine;
use Amp\Promise;
use function Amp\call;
/**
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
* be buffered and accessed in its entirety by calling buffer(). Once buffering is requested through buffer(), the
* stream cannot be read in chunks. On destruct any remaining data is read from the InputStream given to this class.
*/
class Payload implements InputStream
{
/** @var InputStream */
private $stream;
/** @var \Amp\Promise|null */
private $promise;
/** @var \Amp\Promise|null */
private $lastRead;
/**
* @param \Amp\ByteStream\InputStream $stream
*/
public function __construct(InputStream $stream)
{
$this->stream = $stream;
}
public function __destruct()
{
if (!$this->promise) {
Promise\rethrow(new Coroutine($this->consume()));
}
}
private function consume(): \Generator
{
try {
if ($this->lastRead && null === yield $this->lastRead) {
return;
}
while (null !== yield $this->stream->read()) {
// Discard unread bytes from message.
}
} catch (\Throwable $exception) {
// If exception is thrown here the connection closed anyway.
}
}
/**
* @inheritdoc
*
* @throws \Error If a buffered message was requested by calling buffer().
*/
final public function read(): Promise
{
if ($this->promise) {
throw new \Error("Cannot stream message data once a buffered message has been requested");
}
return $this->lastRead = $this->stream->read();
}
/**
* Buffers the entire message and resolves the returned promise then.
*
* @return Promise<string> Resolves with the entire message contents.
*/
final public function buffer(): Promise
{
if ($this->promise) {
return $this->promise;
}
return $this->promise = call(function () {
$buffer = '';
if ($this->lastRead && null === yield $this->lastRead) {
return $buffer;
}
while (null !== $chunk = yield $this->stream->read()) {
$buffer .= $chunk;
}
return $buffer;
});
}
}

View File

@@ -0,0 +1,17 @@
<?php
namespace Amp\ByteStream;
/**
* Thrown in case a second read operation is attempted while another read operation is still pending.
*/
final class PendingReadError extends \Error
{
public function __construct(
string $message = "The previous read operation must complete before read can be called again",
int $code = 0,
\Throwable $previous = null
) {
parent::__construct($message, $code, $previous);
}
}

View File

@@ -0,0 +1,262 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Input stream abstraction for PHP's stream resources.
*/
final class ResourceInputStream implements InputStream
{
const DEFAULT_CHUNK_SIZE = 8192;
/** @var resource|null */
private $resource;
/** @var string */
private $watcher;
/** @var Deferred|null */
private $deferred;
/** @var bool */
private $readable = true;
/** @var int */
private $chunkSize;
/** @var bool */
private $useSingleRead;
/** @var callable */
private $immediateCallable;
/** @var string|null */
private $immediateWatcher;
/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per read operation.
*
* @throws \Error If an invalid stream or parameter has been passed.
*/
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
$useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO";
$this->useSingleRead = $useSingleRead;
if (\strpos($meta["mode"], "r") === false && \strpos($meta["mode"], "+") === false) {
throw new \Error("Expected a readable stream");
}
\stream_set_blocking($stream, false);
\stream_set_read_buffer($stream, 0);
$this->resource = &$stream;
$this->chunkSize = &$chunkSize;
$deferred = &$this->deferred;
$readable = &$this->readable;
$this->watcher = Loop::onReadable($this->resource, static function ($watcher) use (
&$deferred,
&$readable,
&$stream,
&$chunkSize,
$useSingleRead
) {
if ($useSingleRead) {
$data = @\fread($stream, $chunkSize);
} else {
$data = @\stream_get_contents($stream, $chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if ($data === '' && @\feof($stream)) {
$readable = false;
$stream = null;
$data = null; // Stream closed, resolve read with null.
Loop::cancel($watcher);
} else {
Loop::disable($watcher);
}
$temp = $deferred;
$deferred = null;
\assert($temp instanceof Deferred);
$temp->resolve($data);
});
$this->immediateCallable = static function ($watcherId, $data) use (&$deferred) {
$temp = $deferred;
$deferred = null;
\assert($temp instanceof Deferred);
$temp->resolve($data);
};
Loop::disable($this->watcher);
}
/** @inheritdoc */
public function read(): Promise
{
if ($this->deferred !== null) {
throw new PendingReadError;
}
if (!$this->readable) {
return new Success; // Resolve with null on closed stream.
}
\assert($this->resource !== null);
// Attempt a direct read, because Windows suffers from slow I/O on STDIN otherwise.
if ($this->useSingleRead) {
$data = @\fread($this->resource, $this->chunkSize);
} else {
$data = @\stream_get_contents($this->resource, $this->chunkSize);
}
\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
if ($data === '') {
// Error suppression, because pthreads does crazy things with resources,
// which might be closed during two operations.
// See https://github.com/amphp/byte-stream/issues/32
if (@\feof($this->resource)) {
$this->readable = false;
$this->resource = null;
Loop::cancel($this->watcher);
return new Success; // Stream closed, resolve read with null.
}
$this->deferred = new Deferred;
Loop::enable($this->watcher);
return $this->deferred->promise();
}
// Prevent an immediate read → write loop from blocking everything
// See e.g. examples/benchmark-throughput.php
$this->deferred = new Deferred;
$this->immediateWatcher = Loop::defer($this->immediateCallable, $data);
return $this->deferred->promise();
}
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
public function close()
{
if (\is_resource($this->resource)) {
// Error suppression, as resource might already be closed
$meta = @\stream_get_meta_data($this->resource);
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_RD);
} else {
/** @psalm-suppress InvalidPropertyAssignmentValue */
@\fclose($this->resource);
}
}
$this->free();
}
/**
* Nulls reference to resource, marks stream unreadable, and succeeds any pending read with null.
*
* @return void
*/
private function free()
{
$this->readable = false;
$this->resource = null;
if ($this->deferred !== null) {
$deferred = $this->deferred;
$this->deferred = null;
$deferred->resolve();
}
Loop::cancel($this->watcher);
if ($this->immediateWatcher !== null) {
Loop::cancel($this->immediateWatcher);
}
}
/**
* @return resource|null The stream resource or null if the stream has closed.
*/
public function getResource()
{
return $this->resource;
}
/**
* @return void
*/
public function setChunkSize(int $chunkSize)
{
$this->chunkSize = $chunkSize;
}
/**
* References the read watcher, so the loop keeps running in case there's an active read.
*
* @return void
*
* @see Loop::reference()
*/
public function reference()
{
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::reference($this->watcher);
}
/**
* Unreferences the read watcher, so the loop doesn't keep running even if there are active reads.
*
* @return void
*
* @see Loop::unreference()
*/
public function unreference()
{
if (!$this->resource) {
throw new \Error("Resource has already been freed");
}
Loop::unreference($this->watcher);
}
public function __destruct()
{
if ($this->resource !== null) {
$this->free();
}
}
}

View File

@@ -0,0 +1,321 @@
<?php
namespace Amp\ByteStream;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
/**
* Output stream abstraction for PHP's stream resources.
*/
final class ResourceOutputStream implements OutputStream
{
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;
const LARGE_CHUNK_SIZE = 128 * 1024;
/** @var resource|null */
private $resource;
/** @var string */
private $watcher;
/** @var \SplQueue<array> */
private $writes;
/** @var bool */
private $writable = true;
/** @var int|null */
private $chunkSize;
/**
* @param resource $stream Stream resource.
* @param int|null $chunkSize Chunk size per `fwrite()` operation.
*/
public function __construct($stream, int $chunkSize = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}
$meta = \stream_get_meta_data($stream);
if (\strpos($meta["mode"], "r") !== false && \strpos($meta["mode"], "+") === false) {
throw new \Error("Expected a writable stream");
}
\stream_set_blocking($stream, false);
\stream_set_write_buffer($stream, 0);
$this->resource = $stream;
$this->chunkSize = &$chunkSize;
$writes = $this->writes = new \SplQueue;
$writable = &$this->writable;
$resource = &$this->resource;
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$chunkSize, &$writable, &$resource) {
static $emptyWrites = 0;
try {
while (!$writes->isEmpty()) {
/** @var Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
if (!\is_resource($stream) || (($metaData = @\stream_get_meta_data($stream)) && $metaData['eof'])) {
throw new ClosedException("The stream was closed by the peer");
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed
if ($chunkSize) {
$written = @\fwrite($stream, $data, $chunkSize);
} else {
$written = @\fwrite($stream, $data);
}
\assert(
$written !== false || \PHP_VERSION_ID >= 70400, // PHP 7.4+ returns false on EPIPE.
"Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the still referenced in the loop."
);
// PHP 7.4.0 and 7.4.1 may return false on EAGAIN.
if ($written === false && \PHP_VERSION_ID >= 70402) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
throw new StreamException($message);
}
// Broken pipes between processes on macOS/FreeBSD do not detect EOF properly.
if ($written === 0 || $written === false) {
if ($emptyWrites++ > self::MAX_CONSECUTIVE_EMPTY_WRITES) {
$message = "Failed to write to stream after multiple attempts";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
throw new StreamException($message);
}
$writes->unshift([$data, $previous, $deferred]);
return;
}
$emptyWrites = 0;
if ($length > $written) {
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
$deferred->resolve($written + $previous);
}
} catch (\Throwable $exception) {
$resource = null;
$writable = false;
/** @psalm-suppress PossiblyUndefinedVariable */
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
Loop::cancel($watcher);
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
Loop::disable($this->watcher);
}
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function write(string $data): Promise
{
return $this->send($data, false);
}
/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
*/
public function end(string $finalData = ""): Promise
{
return $this->send($finalData, true);
}
private function send(string $data, bool $end = false): Promise
{
if (!$this->writable) {
return new Failure(new ClosedException("The stream is not writable"));
}
$length = \strlen($data);
$written = 0;
if ($end) {
$this->writable = false;
}
if ($this->writes->isEmpty()) {
if ($length === 0) {
if ($end) {
$this->close();
}
return new Success(0);
}
if (!\is_resource($this->resource) || (($metaData = @\stream_get_meta_data($this->resource)) && $metaData['eof'])) {
return new Failure(new ClosedException("The stream was closed by the peer"));
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed.
if ($this->chunkSize) {
$written = @\fwrite($this->resource, $data, $this->chunkSize);
} else {
$written = @\fwrite($this->resource, $data);
}
\assert(
$written !== false || \PHP_VERSION_ID >= 70400, // PHP 7.4+ returns false on EPIPE.
"Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the still referenced in the loop."
);
// PHP 7.4.0 and 7.4.1 may return false on EAGAIN.
if ($written === false && \PHP_VERSION_ID >= 70402) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
return new Failure(new StreamException($message));
}
$written = (int) $written; // Cast potential false to 0.
if ($length === $written) {
if ($end) {
$this->close();
}
return new Success($written);
}
$data = \substr($data, $written);
}
$deferred = new Deferred;
if ($length - $written > self::LARGE_CHUNK_SIZE) {
$chunks = \str_split($data, self::LARGE_CHUNK_SIZE);
$data = \array_pop($chunks);
foreach ($chunks as $chunk) {
$this->writes->push([$chunk, $written, new Deferred]);
$written += self::LARGE_CHUNK_SIZE;
}
}
$this->writes->push([$data, $written, $deferred]);
Loop::enable($this->watcher);
$promise = $deferred->promise();
if ($end) {
$promise->onResolve([$this, "close"]);
}
return $promise;
}
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
public function close()
{
if (\is_resource($this->resource)) {
// Error suppression, as resource might already be closed
$meta = @\stream_get_meta_data($this->resource);
if ($meta && \strpos($meta["mode"], "+") !== false) {
@\stream_socket_shutdown($this->resource, \STREAM_SHUT_WR);
} else {
/** @psalm-suppress InvalidPropertyAssignmentValue psalm reports this as closed-resource */
@\fclose($this->resource);
}
}
$this->free();
}
/**
* Nulls reference to resource, marks stream unwritable, and fails any pending write.
*
* @return void
*/
private function free()
{
$this->resource = null;
$this->writable = false;
if (!$this->writes->isEmpty()) {
$exception = new ClosedException("The socket was closed before writing completed");
do {
/** @var Deferred $deferred */
list(, , $deferred) = $this->writes->shift();
$deferred->fail($exception);
} while (!$this->writes->isEmpty());
}
Loop::cancel($this->watcher);
}
/**
* @return resource|null Stream resource or null if end() has been called or the stream closed.
*/
public function getResource()
{
return $this->resource;
}
/**
* @return void
*/
public function setChunkSize(int $chunkSize)
{
$this->chunkSize = $chunkSize;
}
public function __destruct()
{
if ($this->resource !== null) {
$this->free();
}
}
}

View File

@@ -0,0 +1,7 @@
<?php
namespace Amp\ByteStream;
class StreamException extends \Exception
{
}

View File

@@ -0,0 +1,112 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
use function Amp\call;
/**
* Allows decompression of input streams using Zlib.
*/
final class ZlibInputStream implements InputStream
{
/** @var InputStream|null */
private $source;
/** @var int */
private $encoding;
/** @var array */
private $options;
/** @var resource|null */
private $resource;
/**
* @param InputStream $source Input stream to read compressed data from.
* @param int $encoding Compression algorithm used, see `inflate_init()`.
* @param array $options Algorithm options, see `inflate_init()`.
*
* @throws StreamException
* @throws \Error
*
* @see http://php.net/manual/en/function.inflate-init.php
*/
public function __construct(InputStream $source, int $encoding, array $options = [])
{
$this->source = $source;
$this->encoding = $encoding;
$this->options = $options;
$this->resource = @\inflate_init($encoding, $options);
if ($this->resource === false) {
throw new StreamException("Failed initializing deflate context");
}
}
/** @inheritdoc */
public function read(): Promise
{
return call(function () {
if ($this->resource === null) {
return null;
}
\assert($this->source !== null);
$data = yield $this->source->read();
// Needs a double guard, as stream might have been closed while reading
/** @psalm-suppress ParadoxicalCondition */
if ($this->resource === null) {
return null;
}
if ($data === null) {
$decompressed = @\inflate_add($this->resource, "", \ZLIB_FINISH);
if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
$this->close();
return $decompressed;
}
$decompressed = @\inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);
if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
return $decompressed;
});
}
/**
* @internal
* @return void
*/
private function close()
{
$this->resource = null;
$this->source = null;
}
/**
* Gets the used compression encoding.
*
* @return int Encoding specified on construction time.
*/
public function getEncoding(): int
{
return $this->encoding;
}
/**
* Gets the used compression options.
*
* @return array Options array passed on construction time.
*/
public function getOptions(): array
{
return $this->options;
}
}

View File

@@ -0,0 +1,119 @@
<?php
namespace Amp\ByteStream;
use Amp\Promise;
/**
* Allows compression of output streams using Zlib.
*/
final class ZlibOutputStream implements OutputStream
{
/** @var OutputStream|null */
private $destination;
/** @var int */
private $encoding;
/** @var array */
private $options;
/** @var resource|null */
private $resource;
/**
* @param OutputStream $destination Output stream to write the compressed data to.
* @param int $encoding Compression encoding to use, see `deflate_init()`.
* @param array $options Compression options to use, see `deflate_init()`.
*
* @throws StreamException If an invalid encoding or invalid options have been passed.
*
* @see http://php.net/manual/en/function.deflate-init.php
*/
public function __construct(OutputStream $destination, int $encoding, array $options = [])
{
$this->destination = $destination;
$this->encoding = $encoding;
$this->options = $options;
$this->resource = @\deflate_init($encoding, $options);
if ($this->resource === false) {
throw new StreamException("Failed initializing deflate context");
}
}
/** @inheritdoc */
public function write(string $data): Promise
{
if ($this->resource === null) {
throw new ClosedException("The stream has already been closed");
}
\assert($this->destination !== null);
$compressed = \deflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);
if ($compressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
$promise = $this->destination->write($compressed);
$promise->onResolve(function ($error) {
if ($error) {
$this->close();
}
});
return $promise;
}
/** @inheritdoc */
public function end(string $finalData = ""): Promise
{
if ($this->resource === null) {
throw new ClosedException("The stream has already been closed");
}
\assert($this->destination !== null);
$compressed = \deflate_add($this->resource, $finalData, \ZLIB_FINISH);
if ($compressed === false) {
throw new StreamException("Failed adding data to deflate context");
}
$promise = $this->destination->end($compressed);
$promise->onResolve(function () {
$this->close();
});
return $promise;
}
/**
* @internal
* @return void
*/
private function close()
{
$this->resource = null;
$this->destination = null;
}
/**
* Gets the used compression encoding.
*
* @return int Encoding specified on construction time.
*/
public function getEncoding(): int
{
return $this->encoding;
}
/**
* Gets the used compression options.
*
* @return array Options array passed on construction time.
*/
public function getOptions(): array
{
return $this->options;
}
}

View File

@@ -0,0 +1,188 @@
<?php
namespace Amp\ByteStream;
use Amp\Iterator;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use function Amp\call;
// @codeCoverageIgnoreStart
if (\strlen('…') !== 3) {
throw new \Error(
'The mbstring.func_overload ini setting is enabled. It must be disabled to use the stream package.'
);
} // @codeCoverageIgnoreEnd
if (!\defined('STDOUT')) {
\define('STDOUT', \fopen('php://stdout', 'w'));
}
if (!\defined('STDERR')) {
\define('STDERR', \fopen('php://stderr', 'w'));
}
/**
* @param \Amp\ByteStream\InputStream $source
* @param \Amp\ByteStream\OutputStream $destination
*
* @return \Amp\Promise
*/
function pipe(InputStream $source, OutputStream $destination): Promise
{
return call(function () use ($source, $destination): \Generator {
$written = 0;
while (($chunk = yield $source->read()) !== null) {
$written += \strlen($chunk);
$writePromise = $destination->write($chunk);
$chunk = null; // free memory
yield $writePromise;
}
return $written;
});
}
/**
* @param \Amp\ByteStream\InputStream $source
*
* @return \Amp\Promise
*/
function buffer(InputStream $source): Promise
{
return call(function () use ($source): \Generator {
$buffer = "";
while (($chunk = yield $source->read()) !== null) {
$buffer .= $chunk;
$chunk = null; // free memory
}
return $buffer;
});
}
/**
* The php://input input buffer stream for the process associated with the currently active event loop.
*
* @return ResourceInputStream
*/
function getInputBufferStream(): ResourceInputStream
{
static $key = InputStream::class . '\\input';
$stream = Loop::getState($key);
if (!$stream) {
$stream = new ResourceInputStream(\fopen('php://input', 'rb'));
Loop::setState($key, $stream);
}
return $stream;
}
/**
* The php://output output buffer stream for the process associated with the currently active event loop.
*
* @return ResourceOutputStream
*/
function getOutputBufferStream(): ResourceOutputStream
{
static $key = OutputStream::class . '\\output';
$stream = Loop::getState($key);
if (!$stream) {
$stream = new ResourceOutputStream(\fopen('php://output', 'wb'));
Loop::setState($key, $stream);
}
return $stream;
}
/**
* The STDIN stream for the process associated with the currently active event loop.
*
* @return ResourceInputStream
*/
function getStdin(): ResourceInputStream
{
static $key = InputStream::class . '\\stdin';
$stream = Loop::getState($key);
if (!$stream) {
$stream = new ResourceInputStream(\STDIN);
Loop::setState($key, $stream);
}
return $stream;
}
/**
* The STDOUT stream for the process associated with the currently active event loop.
*
* @return ResourceOutputStream
*/
function getStdout(): ResourceOutputStream
{
static $key = OutputStream::class . '\\stdout';
$stream = Loop::getState($key);
if (!$stream) {
$stream = new ResourceOutputStream(\STDOUT);
Loop::setState($key, $stream);
}
return $stream;
}
/**
* The STDERR stream for the process associated with the currently active event loop.
*
* @return ResourceOutputStream
*/
function getStderr(): ResourceOutputStream
{
static $key = OutputStream::class . '\\stderr';
$stream = Loop::getState($key);
if (!$stream) {
$stream = new ResourceOutputStream(\STDERR);
Loop::setState($key, $stream);
}
return $stream;
}
function parseLineDelimitedJson(InputStream $stream, bool $assoc = false, int $depth = 512, int $options = 0): Iterator
{
return new Producer(static function (callable $emit) use ($stream, $assoc, $depth, $options) {
$reader = new LineReader($stream);
while (null !== $line = yield $reader->readLine()) {
$line = \trim($line);
if ($line === '') {
continue;
}
/** @noinspection PhpComposerExtensionStubsInspection */
$data = \json_decode($line, $assoc, $depth, $options);
/** @noinspection PhpComposerExtensionStubsInspection */
$error = \json_last_error();
/** @noinspection PhpComposerExtensionStubsInspection */
if ($error !== \JSON_ERROR_NONE) {
/** @noinspection PhpComposerExtensionStubsInspection */
throw new StreamException('Failed to parse JSON: ' . \json_last_error_msg(), $error);
}
yield $emit($data);
}
});
}

53
vendor/amphp/byte-stream/psalm.xml vendored Normal file
View File

@@ -0,0 +1,53 @@
<?xml version="1.0"?>
<psalm
totallyTyped="false"
errorLevel="2"
phpVersion="7.0"
resolveFromConfigFile="true"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
>
<projectFiles>
<directory name="examples"/>
<directory name="lib"/>
<ignoreFiles>
<directory name="vendor"/>
</ignoreFiles>
</projectFiles>
<issueHandlers>
<StringIncrement>
<errorLevel type="suppress">
<directory name="examples"/>
<directory name="lib"/>
</errorLevel>
</StringIncrement>
<RedundantConditionGivenDocblockType>
<errorLevel type="suppress">
<directory name="lib"/>
</errorLevel>
</RedundantConditionGivenDocblockType>
<DocblockTypeContradiction>
<errorLevel type="suppress">
<directory name="lib"/>
</errorLevel>
</DocblockTypeContradiction>
<MissingClosureParamType>
<errorLevel type="suppress">
<directory name="examples"/>
<directory name="lib"/>
</errorLevel>
</MissingClosureParamType>
<MissingClosureReturnType>
<errorLevel type="suppress">
<directory name="examples"/>
<directory name="lib"/>
</errorLevel>
</MissingClosureReturnType>
</issueHandlers>
</psalm>