package ApmModule;
# ApmModule.pm v1.0.0
#   APM child-process connector for Perl 5.10+
#   IPC via stdin/stdout binary frames — identical wire protocol to Node.js connector.
#
#   Requires: JSON (cpan install JSON  or  apt install libjson-perl)
#
#   APM injects env vars:
#     APM=1         — confirms process is managed by APM
#     APM_INDEX     — 0-based instance index (if env_index configured)
#
#   Usage:
#     use ApmModule;
#
#     my $apm = ApmModule->new(sub {
#         my ($s) = @_;
#         $s->write("Hello World", { 'x-status' => '200', 'content-type' => 'text/plain' });
#         $s->close();
#     });
#     $apm->run();

use strict;
use warnings;
use JSON qw(encode_json decode_json);

unless (caller()) {
    if (grep { $_ eq '-update' } @ARGV) {
        my $url  = 'https://processmanager.dev/connectors/ApmModule.pm';
        my $dest = __FILE__;
        print "Updating $dest ...\n";
        my $data = do {
            local $@;
            my $d = eval { require LWP::Simple; LWP::Simple::get($url) };
            $d // `curl -fsSL "$url" 2>/dev/null` // `wget -qO- "$url" 2>/dev/null`;
        };
        die "Update failed: could not download\n" unless defined $data && length $data;
        open(my $fh, '>', $dest) or die "Update failed: $!\n";
        print $fh $data;
        close $fh;
        print "Updated.\n";
        exit 0;
    }
    print STDERR "Usage: perl ApmModule.pm -update\n";
    exit 1;
}

die "[APM] Must run under APM daemon (APM env var not set)\n"
    unless $ENV{APM};

# ─── Session ──────────────────────────────────────────────────────────────────

package ApmSession;

sub new {
    my ($class) = @_;
    return bless {
        session_id    => '',
        session_type  => 'new',
        session_data  => {},
        protocol      => '',
        method        => undef,
        path          => '/',
        path_array    => [],
        query         => {},
        query_object  => {},
        cookies       => {},
        headers       => {},
        remote_ip     => '',
        instance_id   => ($ENV{APM_INDEX} // '0'),
        _deleted      => 0,
        _backlog      => [],
        on_data       => undef,   # sub($data, $is_binary)
        on_close      => undef,   # sub()
    }, $class;
}

sub active    { return !$_[0]->{_deleted} }
sub _mark_deleted { $_[0]->{_deleted} = 1 }

# Send HTTP response body or WebSocket frame. Pass headers on first HTTP write.
sub write {
    my ($self, $data, $headers) = @_;
    $headers //= {};
    my %h = (%$headers, _command => 'write');
    $h{dataType} //= 'text';
    if (ref $data && ref $data eq 'SCALAR') { $data = $$data; }
    $self->_send(\%h, $data);
}

# Send raw bytes, bypassing HTTP/WS framing.
sub write_raw {
    my ($self, $data) = @_;
    $self->_send({ _command => 'writeRaw' }, $data);
}

# Close connection. HTTP: status code response. WS: close frame.
sub close {
    my ($self, $code, $reason) = @_;
    $code   //= 200;
    $reason //= '';
    $self->_send({ _command => 'closeConnection', code => $code, _reason => $reason }, '');
}

# Persist session_data in APM daemon so it survives rolling restarts.
sub save_session_data {
    my ($self) = @_;
    $self->_send({ _command => 'saveSessionData', _sessionData => $self->{session_data} }, '');
}

sub _send {
    my ($self, $json_hdr, $binary) = @_;
    $binary //= '';
    $json_hdr->{_session} = $self->{session_id};
    my $json   = encode_json($json_hdr);
    # Outgoing uint32 = json.len + 1 (0x03) + binary.len
    my $length = length($json) + 1 + length($binary);
    my $frame  = "\x05" . pack('N', $length) . $json . "\x03" . $binary;
    binmode STDOUT;
    print STDOUT $frame;
    STDOUT->flush();
}


# ─── Module ───────────────────────────────────────────────────────────────────

package ApmModule;

sub new {
    my ($class, $on_connect) = @_;
    return bless {
        _on_connect => $on_connect,
        _sessions   => {},
        _buf        => '',
        instance_id => ($ENV{APM_INDEX} // '0'),
    }, $class;
}

# Emit a custom metric. type: 'counter' | 'gauge' | 'timing'
sub metric {
    my ($self, $name, $value, $type) = @_;
    $type //= 'counter';
    my $json   = encode_json({ _command => 'metric', name => $name, value => $value+0, type => $type });
    my $length = length($json) + 1;
    my $frame  = "\x05" . pack('N', $length) . $json . "\x03";
    binmode STDOUT;
    print STDOUT $frame;
    STDOUT->flush();
}

# Push a value to a dashboard module defined in the worker's config.
# module_id: integer id from the dashboard config block
# value:     numeric value to display
# color:     optional CSS color string to override the module color
sub set_dash_value {
    my ($self, $module_id, $value, $color) = @_;
    $color //= '';
    my $json   = encode_json({ _command => 'dash_value', module_id => $module_id+0, value => $value+0, color => $color });
    my $length = length($json) + 1;
    my $frame  = "\x05" . pack('N', $length) . $json . "\x03";
    binmode STDOUT;
    print STDOUT $frame;
    STDOUT->flush();
}

# Start the event loop. Blocks until stdin closes (APM restarts/stops the worker).
sub run {
    my ($self) = @_;
    binmode STDIN;
    binmode STDOUT;
    while (1) {
        my $chunk = '';
        my $n = read(STDIN, $chunk, 65536);
        last unless defined $n && $n > 0;
        $self->{_buf} .= $chunk;
        $self->_process_buffer();
    }
}

sub _process_buffer {
    my ($self) = @_;
    while (length $self->{_buf}) {
        # Re-sync on 0x05 frame start
        if (ord(substr($self->{_buf}, 0, 1)) != 0x05) {
            my $pos = index($self->{_buf}, "\x05");
            if ($pos == -1) { $self->{_buf} = ''; return; }
            $self->{_buf} = substr($self->{_buf}, $pos);
        }
        return if length($self->{_buf}) < 5;

        # Incoming: uint32 = json.len + binary.len  (0x03 NOT counted in uint32)
        my ($payload_len) = unpack('N', substr($self->{_buf}, 1, 4));
        my $frame_len = $payload_len + 6;  # 0x05(1) + uint32(4) + payload + 0x03(1)

        return if length($self->{_buf}) < $frame_len;

        my $frame      = substr($self->{_buf}, 0, $frame_len);
        $self->{_buf}  = substr($self->{_buf}, $frame_len);
        $self->_process_frame($frame);
    }
}

sub _process_frame {
    my ($self, $frame) = @_;
    # frame: [0x05][4-byte len][JSON][0x03][binary]
    my $sep = index($frame, "\x03", 5);
    return if $sep == -1;

    my $header;
    eval { $header = decode_json(substr($frame, 5, $sep - 5)); };
    return if $@ || !$header || !$header->{_sessionId};

    my $sid  = $header->{_sessionId};
    my $sess = $self->{_sessions}{$sid};

    if (!defined $sess) {
        if (($header->{_type} // '') eq 'event' && ($header->{_event} // '') eq 'connectionClosed') {
            return;
        }

        $sess = ApmSession->new();
        $sess->{session_id}   = $sid;
        $sess->{session_type} = $header->{_sessionType}  // 'new';
        $sess->{session_data} = $header->{_sessionData}  // {};
        $sess->{protocol}     = $header->{protocol}      // '';
        $sess->{method}       = $header->{method}        // undef;
        $sess->{path}         = $header->{path}          // '/';
        $sess->{path_array}   = $header->{path_array}    // [];
        $sess->{query}        = $header->{query}         // {};
        $sess->{query_object} = $header->{query_object}  // {};
        $sess->{cookies}      = $header->{cookies}       // {};
        $sess->{headers}      = $header->{headers}       // {};
        my @parts             = split(/,/, $header->{remoteAddress} // '');
        ($sess->{remote_ip}   = $parts[0] // '') =~ s/^\s+|\s+$//g;

        $self->{_sessions}{$sid} = $sess;

        eval { $self->{_on_connect}->($sess); };
        if ($@) { print STDERR "[APM] on_connect error: $@\n"; }

        foreach my $item (@{ $sess->{_backlog} }) {
            if ($sess->active() && $sess->{on_data}) {
                $sess->{on_data}->($item->{data}, $item->{binary});
            }
        }
        $sess->{_backlog} = [];
        return;
    }

    if ($header->{headers})      { %{$sess->{headers}}      = (%{$sess->{headers}},      %{$header->{headers}});  }
    if ($header->{path})         { $sess->{path}         = $header->{path};         }
    if ($header->{path_array})   { $sess->{path_array}   = $header->{path_array};   }
    if ($header->{query})        { $sess->{query}        = $header->{query};        }
    if ($header->{query_object}) { $sess->{query_object} = $header->{query_object}; }
    if ($header->{cookies})      { $sess->{cookies}      = $header->{cookies};      }
    if ($header->{method})       { $sess->{method}       = $header->{method};       }

    return unless $sess->active();

    my $type  = $header->{_type}  // '';
    my $event = $header->{_event} // '';

    if ($type eq 'data' || $type eq 'chunk') {
        my $is_binary = (($header->{dataType} // '') eq 'binary') ? 1 : 0;
        my $data      = substr($frame, $sep + 1);
        if ($sess->{on_data}) {
            $sess->{on_data}->($data, $is_binary);
        } else {
            push @{ $sess->{_backlog} }, { data => $data, binary => $is_binary };
        }
        return;
    }

    if ($type eq 'event' && $event eq 'connectionClosed') {
        if ($sess->{on_close}) {
            eval { $sess->{on_close}->(); };
        }
        $sess->_mark_deleted();
        delete $self->{_sessions}{$sid};
    }
}

1;
