<?php
// apm_module.php v3.0.0
//   APM child-process connector for PHP 7.4+
//   IPC via stdin/stdout binary frames — identical wire protocol to Node.js connector.
//
//   APM injects env vars:
//     APM=1         — confirms process is managed by APM
//     APM_INDEX     — 0-based instance index (if env_index configured)
//
//   Usage:
//     require_once __DIR__ . '/apm_module.php';
//     $apm = new ApmModule(function(ApmSession $s) {
//         $s->write("Hello World", ['x-status' => '200', 'content-type' => 'text/plain']);
//         $s->close();
//     });
//     $apm->onChannel = function($ch, $data, $reply) { if ($reply) $reply(['ok' => true]); };
//     $apm->onStream  = function(IpcStream $s) { $s->accept(['name' => 'worker1']); };
//     $apm->run();

if (PHP_SAPI === 'cli' && isset($argv) && in_array('-update', $argv)) {
    $url  = 'https://processmanager.dev/connectors/apm_module.php';
    $dest = __FILE__;
    echo "Updating {$dest} ...\n";
    $data = @file_get_contents($url);
    if ($data === false) { fwrite(STDERR, "Update failed: could not download\n"); exit(1); }
    if (file_put_contents($dest, $data) === false) { fwrite(STDERR, "Update failed: write error\n"); exit(1); }
    echo "Updated.\n";
    exit(0);
}

if (empty($_SERVER['APM'])) {
    fwrite(STDERR, "[APM] Must run under APM daemon (APM env var not set)\n");
    exit(1);
}

define('APM_INDEX', $_SERVER['APM_INDEX'] ?? '0');

function _apm_send_frame(array $jsonHdr, string $binary = ''): void
{
    $json = json_encode($jsonHdr, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
    $len  = strlen($json) + 1 + strlen($binary);
    fwrite(STDOUT, "\x05" . pack('N', $len) . $json . "\x03" . $binary);
    fflush(STDOUT);
}

// ─── Session ──────────────────────────────────────────────────────────────────

class ApmSession
{
    public string  $sessionId    = '';
    public string  $sessionType  = 'new';
    public array   $sessionData  = [];
    public string  $protocol     = '';
    public ?string $method       = null;
    public string  $path         = '/';
    public array   $path_array   = [];
    public array   $query        = [];
    public array   $query_object = [];
    public array   $cookies      = [];
    public array   $headers      = [];
    public string  $remoteIp     = '';
    public string  $instanceId;

    private bool   $deleted      = false;
    /** @internal */ public array $backlog  = [];
    public          $onData      = null;
    public          $onClose     = null;

    public function __construct()
    {
        $this->instanceId = APM_INDEX;
    }

    public function active(): bool { return !$this->deleted; }

    public function write(string $data, array $headers = []): void
    {
        $h = array_merge(['_command' => 'write'], $headers);
        if (!isset($h['dataType'])) $h['dataType'] = 'text';
        $this->_send($h, $data);
    }

    public function writeRaw(string $data): void
    {
        $this->_send(['_command' => 'writeRaw'], $data);
    }

    public function close(int $code = 200, string $reason = ''): void
    {
        $this->_send(['_command' => 'closeConnection', 'code' => $code, '_reason' => $reason], '');
    }

    public function saveSessionData(): void
    {
        $this->_send(['_command' => 'saveSessionData', '_sessionData' => $this->sessionData], '');
    }

    /** @internal */ public function _markDeleted(): void { $this->deleted = true; }

    /** @internal */
    public function _send(array $jsonHdr, string $binary = ''): void
    {
        $jsonHdr['_session'] = $this->sessionId;
        _apm_send_frame($jsonHdr, $binary);
    }
}

// ─── IPC Stream ──────────────────────────────────────────────────────────────

class IpcStream
{
    public string  $id      = '';
    public array   $header  = [];
    public ?string $peerId  = null;
    public         $onData   = null;  // callable($data, $peer)
    public         $onAttach = null;  // callable($peer) — mediator only
    public         $onDetach = null;  // callable($peer) — mediator only
    public         $onClose  = null;  // callable()

    private bool   $closed  = false;
    private string $role    = 'mediator';

    public function __construct(string $id, ?array $header, string $role)
    {
        $this->id     = $id;
        $this->header = $header ?? [];
        $this->role   = $role;
    }

    public function active(): bool { return !$this->closed; }

    public function write(string $data): void
    {
        if ($this->closed) return;
        _apm_send_frame(['_command' => 'stream_data', '_sid' => $this->id, 'dataType' => 'text'], $data);
    }

    public function writeBinary(string $data): void
    {
        if ($this->closed) return;
        _apm_send_frame(['_command' => 'stream_data', '_sid' => $this->id, 'dataType' => 'binary'], $data);
    }

    public function close(): void
    {
        if ($this->closed) return;
        $this->closed = true;
        _apm_send_frame(['_command' => 'stream_close', '_sid' => $this->id]);
    }

    public function accept(?array $header = null): self
    {
        $hdr = ['_command' => 'stream_accept', '_sid' => $this->id];
        if ($header !== null) $hdr['header'] = $header;
        _apm_send_frame($hdr);
        return $this;
    }

    public function reject(): void
    {
        _apm_send_frame(['_command' => 'stream_reject', '_sid' => $this->id]);
        $this->closed = true;
    }

    /** @internal */ public function _markClosed(): void { $this->closed = true; }
    /** @internal */ public function _setPeerId(string $id): void { $this->peerId = $id; }
}

// ─── Module ───────────────────────────────────────────────────────────────────

class ApmModule
{
    private          $onConnect;
    private array    $sessions = [];
    private array    $streams  = [];
    private array    $pending  = [];
    private string   $buf      = '';
    private int      $ridCtr   = 0;
    public  string   $instanceId;

    // IPC callbacks
    public $onChannel = null;  // callable($channel, $data, $replyFn|null)
    public $onStream  = null;  // callable(IpcStream)

    public function __construct(callable $onConnect)
    {
        $this->onConnect  = $onConnect;
        $this->instanceId = APM_INDEX;
    }

    public function metric(string $name, float $value, string $type = 'counter'): void
    {
        _apm_send_frame(['_command' => 'metric', 'name' => $name, 'value' => $value, 'type' => $type]);
    }

    public function setDashValue(int $moduleId, float $value, string $color = ''): void
    {
        _apm_send_frame(['_command' => 'dash_value', 'module_id' => $moduleId, 'value' => $value, 'color' => $color]);
    }

    // ── IPC: Channel ─────────────────────────────────────────────────────────

    public function send(string $channel, $data = null): void
    {
        _apm_send_frame(['_command' => 'ipc_send', 'channel' => $channel, 'data' => $data]);
    }

    /**
     * Blocking request/reply. Returns reply data or null on timeout.
     * Note: PHP is single-threaded so the reply comes via the event loop.
     * Call this only from the main loop context.
     */
    public function request(string $channel, $data = null, int $timeout = 0)
    {
        $rid = $this->_rid();
        $hdr = ['_command' => 'ipc_request', 'channel' => $channel, 'data' => $data, '_rid' => $rid];
        if ($timeout > 0) $hdr['timeout'] = $timeout;
        _apm_send_frame($hdr);

        $this->pending[$rid] = ['result' => null, 'done' => false];
        $deadline = microtime(true) + (($timeout ?: 5000) + 1000) / 1000.0;

        // Pump the event loop until we get a reply or timeout
        while (!$this->pending[$rid]['done'] && microtime(true) < $deadline) {
            $chunk = fread(STDIN, 65536);
            if ($chunk === false || $chunk === '') {
                if (feof(STDIN)) break;
                usleep(1000);
                continue;
            }
            $this->buf .= $chunk;
            $this->_processBuffer();
        }
        $result = $this->pending[$rid]['result'] ?? null;
        unset($this->pending[$rid]);
        return $result;
    }

    // ── IPC: Stream ──────────────────────────────────────────────────────────

    /**
     * Blocking stream open. Returns IpcStream or null.
     */
    public function requestStream(string $channel, ?array $header = null, int $timeout = 0): ?IpcStream
    {
        $sid = $this->_rid();
        $hdr = ['_command' => 'stream_open', 'channel' => $channel, '_sid' => $sid];
        if ($header !== null) $hdr['header'] = $header;
        if ($timeout > 0) $hdr['timeout'] = $timeout;

        $stream = new IpcStream($sid, $header, 'mediator');
        $this->streams[$sid] = $stream;
        $this->pending[$sid] = ['result' => null, 'done' => false];
        _apm_send_frame($hdr);

        $deadline = microtime(true) + (($timeout ?: 10000) + 1000) / 1000.0;
        while (!$this->pending[$sid]['done'] && microtime(true) < $deadline) {
            $chunk = fread(STDIN, 65536);
            if ($chunk === false || $chunk === '') {
                if (feof(STDIN)) break;
                usleep(1000);
                continue;
            }
            $this->buf .= $chunk;
            $this->_processBuffer();
        }

        $ok = $this->pending[$sid]['result'] ?? false;
        unset($this->pending[$sid]);
        if ($ok) return $stream;
        $stream->_markClosed();
        unset($this->streams[$sid]);
        return null;
    }

    // ── Event loop ───────────────────────────────────────────────────────────

    public function run(): void
    {
        while (true) {
            $chunk = fread(STDIN, 65536);
            if ($chunk === false || $chunk === '') {
                if (feof(STDIN)) break;
                usleep(1000);
                continue;
            }
            $this->buf .= $chunk;
            $this->_processBuffer();
        }
    }

    private function _rid(): string
    {
        return APM_INDEX . '_' . (++$this->ridCtr) . '_php';
    }

    private function _processBuffer(): void
    {
        while (strlen($this->buf) > 0) {
            if (ord($this->buf[0]) !== 0x05) {
                $pos = strpos($this->buf, "\x05");
                if ($pos === false) { $this->buf = ''; return; }
                $this->buf = substr($this->buf, $pos);
            }
            if (strlen($this->buf) < 5) return;

            [, $payloadLen] = unpack('N', substr($this->buf, 1, 4));
            $frameLen = $payloadLen + 6;

            if (strlen($this->buf) < $frameLen) return;

            $frame     = substr($this->buf, 0, $frameLen);
            $this->buf = substr($this->buf, $frameLen);
            $this->_processFrame($frame);
        }
    }

    private function _processFrame(string $frame): void
    {
        $sep = strpos($frame, "\x03", 5);
        if ($sep === false) return;

        $header = json_decode(substr($frame, 5, $sep - 5), true);
        if (!$header) return;

        $binaryData = substr($frame, $sep + 1);
        $type = $header['_type'] ?? '';

        // ── IPC frames ───────────────────────────────────────────────────────

        if ($type === 'ipc_message') {
            if ($this->onChannel !== null) {
                $channel = $header['channel'] ?? '';
                $data    = $header['data'] ?? null;
                $reply   = null;
                if (!empty($header['_needReply']) && !empty($header['_rid'])) {
                    $rid = $header['_rid'];
                    $reply = function($d) use ($rid) {
                        _apm_send_frame(['_command' => 'ipc_reply', '_rid' => $rid, 'data' => $d]);
                    };
                }
                try { ($this->onChannel)($channel, $data, $reply); }
                catch (Throwable $e) { fwrite(STDERR, "[APM] onChannel error: {$e->getMessage()}\n"); }
            }
            return;
        }

        if ($type === 'ipc_reply') {
            $rid = $header['_rid'] ?? '';
            if ($rid && isset($this->pending[$rid])) {
                $this->pending[$rid]['result'] = $header['data'] ?? null;
                $this->pending[$rid]['done']   = true;
            }
            return;
        }

        if ($type === 'stream_request') {
            if ($this->onStream !== null) {
                $sid    = $header['_sid'] ?? '';
                $stream = new IpcStream($sid, $header['header'] ?? null, 'peer');
                $this->streams[$sid] = $stream;
                try { ($this->onStream)($stream); }
                catch (Throwable $e) { fwrite(STDERR, "[APM] onStream error: {$e->getMessage()}\n"); }
            }
            return;
        }

        if ($type === 'stream_opened') {
            $sid = $header['_sid'] ?? '';
            if (isset($this->pending[$sid])) {
                $this->pending[$sid]['result'] = true;
                $this->pending[$sid]['done']   = true;
            }
            return;
        }

        if ($type === 'stream_rejected') {
            $sid = $header['_sid'] ?? '';
            if (isset($this->pending[$sid])) {
                $this->pending[$sid]['result'] = false;
                $this->pending[$sid]['done']   = true;
            }
            if (isset($this->streams[$sid])) {
                $this->streams[$sid]->_markClosed();
                unset($this->streams[$sid]);
            }
            return;
        }

        if ($type === 'stream_attached') {
            $sid = $header['_sid'] ?? '';
            if (isset($this->streams[$sid])) {
                $this->streams[$sid]->_setPeerId($header['_peer'] ?? '');
            }
            return;
        }

        if ($type === 'stream_attach') {
            $sid = $header['_sid'] ?? '';
            $stream = $this->streams[$sid] ?? null;
            if ($stream && $stream->onAttach !== null) {
                try { ($stream->onAttach)(['id' => $header['_peer'] ?? '', 'header' => $header['header'] ?? []]); }
                catch (Throwable $e) { fwrite(STDERR, "[APM] stream.onAttach error: {$e->getMessage()}\n"); }
            }
            return;
        }

        if ($type === 'stream_detach') {
            $sid = $header['_sid'] ?? '';
            $stream = $this->streams[$sid] ?? null;
            if ($stream && $stream->onDetach !== null) {
                try { ($stream->onDetach)(['id' => $header['_peer'] ?? '']); }
                catch (Throwable $e) { fwrite(STDERR, "[APM] stream.onDetach error: {$e->getMessage()}\n"); }
            }
            return;
        }

        if ($type === 'stream_data') {
            $sid = $header['_sid'] ?? '';
            $stream = $this->streams[$sid] ?? null;
            if ($stream && $stream->onData !== null) {
                $peer = isset($header['_peer']) ? ['id' => $header['_peer']] : null;
                try { ($stream->onData)($binaryData, $peer); }
                catch (Throwable $e) { fwrite(STDERR, "[APM] stream.onData error: {$e->getMessage()}\n"); }
            }
            return;
        }

        if ($type === 'stream_close') {
            $sid = $header['_sid'] ?? '';
            $stream = $this->streams[$sid] ?? null;
            if ($stream) {
                $stream->_markClosed();
                unset($this->streams[$sid]);
                if ($stream->onClose !== null) {
                    try { ($stream->onClose)(); } catch (Throwable $_) {}
                }
            }
            return;
        }

        // ── Session frames ───────────────────────────────────────────────────

        if (empty($header['_sessionId'])) return;

        $sid  = $header['_sessionId'];
        $sess = $this->sessions[$sid] ?? null;

        if ($sess === null) {
            if (($header['_type'] ?? '') === 'event' && ($header['_event'] ?? '') === 'connectionClosed') return;

            $sess = new ApmSession();
            $sess->sessionId    = $sid;
            $sess->sessionType  = $header['_sessionType']  ?? 'new';
            $sess->sessionData  = $header['_sessionData']  ?? [];
            $sess->protocol     = $header['protocol']      ?? '';
            $sess->method       = $header['method']        ?? null;
            $sess->path         = $header['path']          ?? '/';
            $sess->path_array   = $header['path_array']    ?? [];
            $sess->query        = $header['query']         ?? [];
            $sess->query_object = $header['query_object']  ?? [];
            $sess->cookies      = $header['cookies']       ?? [];
            $sess->headers      = $header['headers']       ?? [];
            $parts              = explode(',', $header['remoteAddress'] ?? '');
            $sess->remoteIp     = trim($parts[0] ?? '');

            $this->sessions[$sid] = $sess;

            try {
                ($this->onConnect)($sess);
            } catch (Throwable $e) {
                fwrite(STDERR, "[APM] onConnect error: {$e->getMessage()}\n");
            }

            foreach ($sess->backlog as $item) {
                if ($sess->active() && $sess->onData !== null) {
                    ($sess->onData)($item['data'], $item['binary']);
                }
            }
            $sess->backlog = [];
            return;
        }

        if (!empty($header['headers']))      $sess->headers      = array_merge($sess->headers, $header['headers']);
        if (!empty($header['path']))         $sess->path         = $header['path'];
        if (!empty($header['path_array']))   $sess->path_array   = $header['path_array'];
        if (!empty($header['query']))        $sess->query        = $header['query'];
        if (!empty($header['query_object'])) $sess->query_object = $header['query_object'];
        if (!empty($header['cookies']))      $sess->cookies      = $header['cookies'];
        if (!empty($header['method']))       $sess->method       = $header['method'];

        if (!$sess->active()) return;

        $type  = $header['_type']  ?? '';
        $event = $header['_event'] ?? '';

        if ($type === 'data' || $type === 'chunk') {
            $isBinary = ($header['dataType'] ?? '') === 'binary';
            $data     = substr($frame, $sep + 1);
            if ($sess->onData !== null) {
                ($sess->onData)($data, $isBinary);
            } else {
                $sess->backlog[] = ['data' => $data, 'binary' => $isBinary];
            }
            return;
        }

        if ($type === 'event' && $event === 'connectionClosed') {
            if ($sess->onClose !== null) {
                try { ($sess->onClose)(); } catch (Throwable $_) {}
            }
            $sess->_markDeleted();
            unset($this->sessions[$sid]);
        }
    }
}
