diff --git a/SUMMARY.md b/SUMMARY.md index f707784..50d0c21 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -91,6 +91,7 @@ ## ⚡️ gRPC - [Intro into gRPC](grpc/grpc.md) +- [Interceptors](grpc/interceptors.md) - [Protoreg](grpc/protoreg.md) ## 📈 Logging and Observability diff --git a/grpc/grpc.md b/grpc/grpc.md index 690842c..2d3e605 100644 --- a/grpc/grpc.md +++ b/grpc/grpc.md @@ -7,6 +7,8 @@ It consists of two main parts: 1. **protoc-plugin `protoc-gen-php-grpc`:** This is a plugin for the protoc compiler that generates PHP code from a gRPC service definition file (`.proto`). It generates PHP classes that correspond to the service definition and message types. These classes provide an interface for handling incoming gRPC requests and sending responses back to the client. 2. **gRPC server:** This is a server that starts PHP workers and listens for incoming gRPC requests. It receives requests from gRPC clients, proxies them to the PHP workers, and sends the responses back to the client. The server is responsible for managing the lifecycle of the PHP workers and ensuring that they are available to handle requests. +For custom gRPC interceptor plugins, see [Interceptors](./interceptors.md). + ## Protoc-plugin The first step is to define a `.proto` file that describes the gRPC service and messages that your PHP application will handle. diff --git a/grpc/interceptors.md b/grpc/interceptors.md new file mode 100644 index 0000000..8310848 --- /dev/null +++ b/grpc/interceptors.md @@ -0,0 +1,231 @@ +# gRPC Interceptors + +RoadRunner gRPC plugin supports custom unary interceptors. + +Use interceptors when you want to add cross-cutting behavior around RPC calls, such as: + +- request/response logging, +- auth checks, +- rate limiting, +- custom metrics. + +{% hint style="info" %} +At the moment, RoadRunner supports unary gRPC interceptors (`grpc.UnaryServerInterceptor`). +{% endhint %} + +## Interceptor contract + +Your plugin must implement this interface: + +{% code title="grpc/api/interfaces.go" %} + +```go +type Interceptor interface { + UnaryServerInterceptor() grpc.UnaryServerInterceptor + Name() string +} +``` + +{% endcode %} + +The `Name()` return value is used in the `grpc.interceptors` configuration list. + +## Configuration + +Add interceptor names under the `grpc.interceptors` section: + +{% code title=".rr.yaml" %} + +```yaml +version: "3" + +server: + command: "php grpc-worker.php" + +grpc: + listen: "tcp://127.0.0.1:9001" + + proto: + - "proto/helloworld.proto" + + interceptors: + - "sample-grpc-interceptor" +``` + +{% endcode %} + +{% hint style="warning" %} +Make sure every name in `grpc.interceptors` matches a registered interceptor plugin name. +{% endhint %} + +## Execution order + +RoadRunner applies configured interceptors in the same order as the config list. + +Example: + +{% code title=".rr.yaml" %} + +```yaml +grpc: + interceptors: ["first", "second", "third"] +``` + +{% endcode %} + +Execution order will be: + +`first -> second -> third -> handler` + +## Example plugin + +Ready-to-use example code is available in the samples repository: + +- [gRPC interceptor sample](https://github.com/roadrunner-server/samples/tree/master/grpc_interceptor) + +## Build a custom RoadRunner binary with interceptor + +### Build options + +#### Build with Velox + +Use [Velox](../customization/build.md) when you want to build RoadRunner from a `velox.toml` configuration. + +{% code title="velox.toml" %} + +```toml +[roadrunner] +ref = "v2025.1.5" + +[github] +[github.token] +token = "${RT_TOKEN}" + +[github.plugins] +appLogger = { ref = "v5.0.2", owner = "roadrunner-server", repository = "app-logger" } +logger = { ref = "v5.0.2", owner = "roadrunner-server", repository = "logger" } +lock = { ref = "v5.0.2", owner = "roadrunner-server", repository = "lock" } +rpc = { ref = "v5.0.2", owner = "roadrunner-server", repository = "rpc" } +server = { ref = "v5.0.2", owner = "roadrunner-server", repository = "server" } +grpc = { ref = "v5.0.2", owner = "roadrunner-server", repository = "grpc" } +grpcInterceptor = { ref = "master", owner = "roadrunner-server", repository = "samples", folder = "grpc_interceptor" } + +[log] +level = "info" +mode = "production" +``` + +{% endcode %} + +{% code %} + +```bash +go install github.com/roadrunner-server/velox/v2025/cmd/vx@latest +vx build -c velox.toml -o . +``` + +{% endcode %} + +The binary is created as `./rr`. + +#### Build with RoadRunner + +Use this option when you build directly from the RoadRunner repository. + +##### Clone RoadRunner + +{% code %} + +```bash +git clone https://github.com/roadrunner-server/roadrunner.git +cd roadrunner +``` + +{% endcode %} + +##### Add interceptor package + +You can either: + +- copy sample code into your project, or +- import the sample package directly. + +To import the sample package directly: + +{% code %} + +```bash +go get github.com/roadrunner-server/samples/grpc_interceptor@latest +``` + +{% endcode %} + +##### Register plugin in container + +Edit `container/plugins.go` and add your interceptor plugin to the plugin list: + +{% code title="container/plugins.go" %} + +```go +import ( + grpcPlugin "github.com/roadrunner-server/grpc/v5" + grpcInterceptor "github.com/roadrunner-server/samples/grpc_interceptor" +) + +func Plugins() []any { + return []any{ + // ... + &grpcInterceptor.Plugin{}, + &grpcPlugin.Plugin{}, + // ... + } +} +``` + +{% endcode %} + +##### Build binary + +{% code %} + +```bash +make build +``` + +{% endcode %} + +The binary is created as `./rr`. + +### Configure interceptor in `.rr.yaml` + +Use the plugin name returned by `Name()`: + +{% code title=".rr.yaml" %} + +```yaml +grpc: + interceptors: + - "sample-grpc-interceptor" +``` + +{% endcode %} + +### Run and verify + +Start RoadRunner: + +{% code %} + +```bash +./rr serve -c .rr.yaml +``` + +{% endcode %} + +Send a request using your preferred gRPC client (for example, `grpc-client-cli`) and verify interceptor logs in RR output. + +## What's next? + +1. [Intro into gRPC](./grpc.md) +2. [Writing a Middleware](../customization/middleware.md) +3. [Building RR with a custom plugin](../customization/build.md) diff --git a/queues/amqp.md b/queues/amqp.md index a70138e..bf632ad 100644 --- a/queues/amqp.md +++ b/queues/amqp.md @@ -64,7 +64,60 @@ amqp: Upon establishing a connection to the server, you can create a new queue that utilizes this connection and encompasses the queue settings, including those specific to AMQP. -## Configuration +## Pipeline Configuration Formats + +AMQP pipeline configuration is selected by `jobs.pipelines..config.version`: + +- `0`, `1`, or omitted: legacy flat parser. +- `2`: nested parser with `exchange` and `queue` sections (recommended). + +## Version 2 Configuration (Recommended) + +Set `jobs.pipelines..config.version: 2` and define `exchange` and `queue` sections. + +### Options in `config` + +- `priority`: pipeline priority. If a job has priority `0`, it inherits the pipeline priority. Default: `10`. +- `prefetch`: RabbitMQ QoS prefetch. Default: `10`. +- `redial_timeout`: reconnect timeout in seconds. Default: `60`. + +### Exchange settings + +- `name`: exchange name. Default: `amqp.default`. +- `type`: exchange type. Supported: `direct`, `fanout`, `topic`, `headers`. Default: `direct`. +- `durable`: durable exchange flag. Default: `false`. +- `auto_delete`: auto-delete exchange when last queue is unbound. Default: `false`. +- `declare`: declare exchange on startup. Default: `true`. + +### Queue settings + +- `name`: queue name. Optional for producer-only pipelines; required for `run`, `resume`, and `pause`. +- `routing_key`: routing key. Required when `exchange.type != fanout`. +- `durable`: durable queue flag. Default: `false`. +- `auto_delete`: auto-delete queue after the last consumer unsubscribes. Default: `false`. +- `exclusive`: exclusive queue flag. Default: `false`. +- `consumer_id`: consumer identifier. Default: `roadrunner-`. +- `delete_on_stop`: delete queue when pipeline stops. Default: `false`. +- `multiple_ack`: ACK this and prior unacked deliveries on the same channel. Default: `false`. +- `requeue_on_fail`: use RabbitMQ requeue on failure (Nack). Default: `false`. +- `headers`: queue declaration arguments (for example, `x-queue-mode: lazy`). +- `declare`: declare and bind queue on startup. Default: `true`. + +{% hint style="info" %} +See also [AMQP model](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model) documentation section. +{% endhint %} + +{% hint style="info" %} +Producer-only pipeline: `queue.name` can be empty, `push` works, but `run`, `resume`, and `pause` will fail without a queue name. +{% endhint %} + +{% hint style="info" %} +If `exchange.type` is not `fanout`, `queue.routing_key` must be set. +{% endhint %} + +{% hint style="info" %} +Read more about Nack in RabbitMQ official docs: https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue +{% endhint %} {% code title=".rr.yaml" %} @@ -72,235 +125,165 @@ the queue settings, including those specific to AMQP. version: "3" amqp: - addr: amqp://guest:guest@127.0.0.1:5672 - - # AMQPS TLS configuration - # - # This section is optional + addr: amqp://guest:guest@127.0.0.1:5672/ tls: - # Path to the key file - # - # This option is required key: "" - - # Path to the certificate - # - # This option is required cert: "" - - # Path to Root CAs used by the AMQP client to trust and verify the broker/server certificate during TLS dial. - # - # This option is optional root_ca: "" - - # Client auth type (mTLS, peer verification). - # - # This option is optional. Default value: no_client_cert. Possible values: no_client_cert, request_client_cert, require_any_client_cert, verify_client_cert_if_given, require_and_verify_client_cert client_auth_type: no_client_cert jobs: pipelines: - # User defined name of the queue. - example: - # Driver name - # - # This option is required. + p1: driver: amqp - - # Driver's configuration - # - # Should not be empty config: - - # QoS - prefetch. - # - # Default: 10 + version: 2 + priority: 10 prefetch: 10 - - # Pipeline priority - # - # If the job has priority set to 0, it will inherit the pipeline's priority. Default: 10. - priority: 1 - - # Redial timeout (in seconds). How long to try to reconnect to the AMQP server. - # - # Default: 60 redial_timeout: 60 - - # Durable queue - # - # Default: false - durable: false - - # Durable exchange (rabbitmq option: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges) - # - # Default: false - exchange_durable: false - - # Auto-delete (exchange is deleted when last queue is unbound from it): https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges - # - # Default: false - exchange_auto_delete: false - - # Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes) (rabbitmq option: https://www.rabbitmq.com/queues.html#properties) - # - # Default: false - queue_auto_delete: false - - # Delete queue when stopping the pipeline - # - # Default: false - delete_queue_on_stop: false - - # Queue name - # - # Optional for producer-only pipelines. Required for run/resume/pause. - # Can be omitted for push-only mode. - queue: test-1-queue - - # Exchange name - # - # Optional. Default: amqp.default - exchange: amqp.default - - # Exchange type - # - # Default: direct. Possible values: direct, fanout, topic, headers. - exchange_type: direct - - # Routing key for the queue - # - # Default: empty. Required for push when exchange_type != fanout. - routing_key: test - - # Declare a queue exclusive at the exchange - # - # Default: false - exclusive: false - - # When multiple is true, this delivery and all prior unacknowledged deliveries - # on the same channel will be acknowledged. This is useful for batch processing - # of deliveries - # - # Default: false - multiple_ack: false - - # The consumer_id is identified by a string that is unique and scoped for all consumers on this channel. - # - # Default: "roadrunner" + uuid. - consumer_id: "roadrunner-uuid" - - # Use rabbitmq mechanism to requeue the job on fail - # - # Default: false - requeue_on_fail: false - - # Queue headers (new in 2.12.2) - # - # Default: null - queue_headers: - x-queue-mode: lazy + exchange: + name: amqp.default + type: direct + durable: false + auto_delete: false + declare: true + queue: + name: p1-queue + routing_key: p1 + durable: false + auto_delete: false + exclusive: false + consumer_id: "" + delete_on_stop: false + multiple_ack: false + requeue_on_fail: false + headers: + x-queue-mode: lazy + declare: true ``` {% endcode %} -## Configuration options +## Read-only RabbitMQ Permissions -**Here is a detailed description of each of the amqp-specific options:** +If the RabbitMQ user does not have `configure` permission, disable declarations in pipeline configuration: -### Priority +- `jobs.pipelines..config.exchange.declare: false` +- `jobs.pipelines..config.queue.declare: false` -`priority`- job priority. A lower value corresponds to a higher priority. For instance, consider two pipelines: `pipe1` -with a priority of `1` and `pipe10` with a priority of `10`. Workers will only take jobs from `pipe10` if all the jobs -from `pipe1` have been processed. +This avoids startup declaration failures and passive queue-check failures in restricted environments. -### Prefetch - -`prefetch` - rabbitMQ QoS prefetch. See also ["prefetch-size"](https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size). Note that if you use a large number of workers and a small `prefetch` number, some of the workers may not be loaded with messages (jobs) due to the blocking nature of the prefetch. This would result in poor RoadRunner performance and waste of resources. +{% code title=".rr.yaml" %} -### Queue +```yaml +version: "3" -`queue` - AMQP internal (inside the driver) queue name. Optional for producer-only pipelines, required for consumer lifecycle operations (`run`, `resume`, `pause`). +amqp: + addr: amqp://readonly:readonly@127.0.0.1:5675/TEST -### Exchange +jobs: + pipelines: + readonly: + driver: amqp + config: + version: 2 + exchange: + name: test-1-exchange + type: fanout + durable: true + auto_delete: false + declare: false + queue: + name: test-1-queue + routing_key: test-1 + durable: true + auto_delete: false + exclusive: false + declare: false +``` -`exchange` - rabbitMQ exchange name. Optional, default: `amqp.default`. +{% endcode %} -{% hint style="info" %} -See also [AMQP model](https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model) documentation section. -{% endhint %} +## Runtime / RPC (`jobs.Declare`) -{% hint style="info" %} -Producer-only pipeline: `queue` can be empty, `push` works, but `run`, `resume`, and `pause` will fail without a queue. -{% endhint %} +Dynamic pipeline declaration over RPC remains flat. It does not use nested `config.exchange` / `config.queue` sections. -### Exchange type +Declaration control keys in `jobs.Declare` payload: -`exchange_type` - rabbitMQ exchange type. May be one of `direct`, `fanout`, `topic`, `headers`. +- `exchange_declare` +- `queue_declare` -### Routing key +## Legacy Flat Configuration (`version: 0|1`) -`routing_key` - queue's routing key. Required for `push` when `exchange_type != fanout`. +Legacy flat keys remain supported under `jobs.pipelines..config` when `config.version` is `0`, `1`, or omitted: -### Exclusive +- `priority` +- `prefetch` +- `queue` +- `exchange` +- `exchange_type` +- `routing_key` +- `exclusive` +- `multiple_ack` +- `requeue_on_fail` +- `queue_headers` +- `durable` +- `delete_queue_on_stop` +- `redial_timeout` +- `exchange_durable` +- `exchange_auto_delete` +- `queue_auto_delete` +- `consumer_id` -`exclusive` - applied to the queue, exclusive queues cannot be redeclared. If set to true, and you attempt to declare -the same pipeline twice, it will result in an error. +{% code title=".rr.yaml" %} -### Multiple ack +```yaml +version: "3" -`multiple_ack` - this delivery, along with all prior unacknowledged deliveries on the same channel, will be -acknowledged. This feature is beneficial for batch processing of deliveries and is applicable only for `Ack`, not for -`Nack`. +amqp: + addr: amqp://guest:guest@127.0.0.1:5672/ -### Requeue on fail +jobs: + pipelines: + p1: + driver: amqp + config: + version: 1 + priority: 10 + prefetch: 10 + queue: p1-queue + exchange: amqp.default + exchange_type: direct + routing_key: p1 + exclusive: false + multiple_ack: false + requeue_on_fail: false + queue_headers: {} # optional, e.g. { x-queue-mode: lazy } + durable: false + delete_queue_on_stop: false + redial_timeout: 60 + exchange_durable: false + exchange_auto_delete: false + queue_auto_delete: false + consumer_id: "" +``` -`requeue_on_fail` - requeue on Nack (by RabbitMQ). +{% endcode %} {% hint style="info" %} -Read more about Nack in RabbitMQ official docs: https://www.rabbitmq.com/confirms.html#consumer-nacks-requeue +In legacy format, `queue` can be omitted for push-only pipelines, but `run`, `resume`, and `pause` require a queue name. Also, `routing_key` is required when `exchange_type != fanout`. {% endhint %} -### Queue headers - -`queue_headers` - used to pass arguments to the `Queue` create method, such as `x-queue-mode: lazy` - -### Durable - -`durable` - create a durable queue. - -Default: `false` - -### Delete queue on stop - -`delete_queue_on_stop` - delete the queue when the pipeline is stopped. - -Default: `false` - -### Redial timeout - -`redial_timeout` - Redial timeout (in seconds). How long to try to reconnect to the AMQP server. - -### Exchange durable - -`exchange_durable` - Durable -exchange ([rabbitmq option](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges)). - -Default: `false` - -### Exchange auto delete - -`exchange_auto_delete` - Auto-delete (exchange is deleted when last queue is unbound from it): [link](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges). - -Default: `false` - -### Queue auto delete - -`queue_auto_delete` - Auto-delete (queue that has had at least one consumer is deleted when last consumer -unsubscribes): [link](https://www.rabbitmq.com/queues.html#properties). +## Migration -Default: `false` +- Prefer `config.version: 2` with nested `exchange` and `queue` sections for new configurations. +- Existing flat configurations remain compatible through `config.version: 0`, `config.version: 1`, or omitted `config.version`. +- For restricted RabbitMQ permissions, set `exchange.declare: false` and `queue.declare: false`. -### Consumer id +## What's Next? -`consumer_id` - string that is unique and scoped for all consumers on this channel. +1. [Queues and Jobs overview](overview-queues.md) - Review the full jobs pipeline model before configuring AMQP in production. +2. [Read-only RabbitMQ permissions](https://www.rabbitmq.com/docs/access-control) - See declaration flags for restricted users and review the Runtime / RPC (`jobs.Declare`) section on this page for flat declaration keys. +3. [Pipeline configuration formats](../intro/config.md) - Confirm when to use `config.version: 2` versus legacy `0|1`, and review the general RoadRunner configuration structure. +4. [Exchange settings](kafka.md) and [Queue settings](sqs.md) - Compare exchange and queue responsibilities when tuning routing and consumption behavior. +5. [Allocate Timeout](../known-issues/allocate-timeout.md) and [CRC validation failed](../known-issues/stdout-crc.md) - Use these troubleshooting references when workers fail to process queue jobs as expected.