> ## Documentation Index
> Fetch the complete documentation index at: https://docs.cow.bleu.builders/llms.txt
> Use this file to discover all available pages before exploring further.

# Notification Producer

> Background service that monitors blockchain events and generates push notifications for CoW Protocol users across multiple chains

# Notification Producer

The Notification Producer is a background service that monitors blockchain events and generates push notifications for users subscribed to CoW Protocol updates. It tracks trades, expired orders, and CMS notifications across multiple chains.

## Purpose and Responsibilities

The Notification Producer performs the following tasks:

* **Trade Notifications**: Monitors executed trades and generates notifications for users
* **Expired Order Notifications**: Detects expired orders and notifies affected users
* **CMS Notifications**: Fetches and distributes content management system notifications
* **Multi-Chain Indexing**: Tracks events across all supported CoW Protocol chains
* **State Persistence**: Maintains indexer state to resume from the last processed block after restarts

## How to Run

### Development

```bash theme={null}
# Start required dependencies
docker-compose up -d queue  # RabbitMQ
docker-compose up -d db     # PostgreSQL

# Optional: Start Redis for caching
docker-compose up -d redis

# Run database migrations
yarn migration:run

# Start the notification producer
yarn producer
```

The producer will start monitoring all supported chains by default.

### Run for Specific Chains

You can limit the producer to specific chains using the `NOTIFICATIONS_PRODUCER_CHAINS` environment variable:

```bash theme={null}
# Run producer only for Mainnet (1) and Gnosis Chain (100)
NOTIFICATIONS_PRODUCER_CHAINS=1,100 yarn producer
```

### Production

```bash theme={null}
# Build the application
yarn build notification-producer

# Start the producer
node dist/apps/notification-producer/main.js
```

### Docker

```bash theme={null}
# Build the Docker image
docker build -f apps/notification-producer/Dockerfile . -t notification-producer

# Run the container
docker run \
  -e QUEUE_HOST=rabbitmq \
  -e QUEUE_USER=rabbit \
  -e QUEUE_PASSWORD=password \
  -e DATABASE_HOST=postgres \
  -e RPC_URL_1=https://mainnet.infura.io/v3/key \
  notification-producer
```

## Key Configuration Options

### Chain Selection

| Variable                        | Description                          | Default              |
| ------------------------------- | ------------------------------------ | -------------------- |
| `NOTIFICATIONS_PRODUCER_CHAINS` | Comma-separated chain IDs to monitor | All supported chains |

**Example**: `NOTIFICATIONS_PRODUCER_CHAINS=1,100,137`

### RPC Configuration

The producer requires RPC endpoints for each chain being monitored:

```bash theme={null}
RPC_URL_1=https://mainnet.infura.io/v3/your-key
RPC_URL_100=https://rpc.gnosischain.com
RPC_URL_137=https://polygon-rpc.com
RPC_URL_8453=https://base-rpc.com
RPC_URL_42161=https://arbitrum-rpc.com
```

### Database Configuration

```bash theme={null}
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_USERNAME=bff-db-user
DATABASE_PASSWORD=bff-db-password
DATABASE_NAME=bff-db
DATABASE_ENABLED=true
```

### RabbitMQ Configuration

```bash theme={null}
QUEUE_HOST=localhost
QUEUE_PORT=5672
QUEUE_USER=rabbit
QUEUE_PASSWORD=my-rabbit-password
```

### Optional: Redis Cache

```bash theme={null}
REDIS_ENABLED=true
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_USER=cow_redis
REDIS_PASSWORD=cow_password
```

## Producer Components

The service runs three types of notification producers concurrently:

### 1. Trade Notification Producer

**Purpose**: Generates notifications when users' orders are executed.

**How it works**:

* Polls blockchain for new blocks
* Queries CoW Protocol settlement events
* Matches trades to subscribed user accounts
* Enriches notifications with token information and USD values
* Sends notifications to RabbitMQ queue

**Key Features**:

* Batch processing (up to 5000 blocks per batch)
* State persistence for crash recovery
* Handles blockchain reorgs gracefully

### 2. Expired Orders Notification Producer

**Purpose**: Notifies users when their orders expire without being filled.

**How it works**:

* Monitors orders approaching expiration
* Checks if orders were filled or cancelled
* Generates notifications for truly expired orders
* Sends to notification queue

### 3. CMS Notification Producer

**Purpose**: Fetches and distributes notifications from the content management system.

**How it works**:

* Polls CMS for new announcements
* Distributes to all subscribed users
* Handles general protocol updates and announcements

## Main Functionality

### Block Indexing

The trade notification producer implements a robust block indexing mechanism:

```typescript theme={null}
// 1. Get last indexed block from database
const stateRegistry = await indexerStateRepository.get(
  'trade_notification_producer',
  chainId
);

// 2. Get current block from RPC
const lastBlock = await client.getBlock();

// 3. Process blocks in batches
while (fromBlock <= toBlock) {
  await processBlocks(fromBlock, toBlock);
  // Update state after each batch
}
```

### Notification Queue

Notifications are sent to RabbitMQ for consumption by delivery services (e.g., Telegram bot):

```javascript theme={null}
// Notification structure
{
  id: string,
  account: string,
  title: string,
  message: string,
  url: string,
  context: {
    chainId: number,
    orderUid: string,
    // ... additional metadata
  }
}
```

## Dependencies

### Required Services

* **PostgreSQL Database** - Stores indexer state and subscription information. Required tables: `indexer_state`, `push_subscriptions`, `on_chain_placed_orders`, `expired_orders`

* **RabbitMQ** - Message queue for distributing notifications to consumers. Queue: `notifications`

* **Blockchain RPC Nodes** - HTTP or WebSocket endpoints for each monitored chain

### Optional Services

* **Redis** - Caching layer for token metadata and frequently accessed data. Improves performance by reducing redundant API calls and blockchain queries.

## Database Schema

The indexer state table stores progress for each producer:

```sql theme={null}
CREATE TABLE indexer_state (
  key TEXT NOT NULL,
  chain_id INTEGER,
  state JSONB,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  UNIQUE (key, chain_id)
);
```

**State Structure**:

```json theme={null}
{
  "lastBlock": "18500000",
  "lastBlockTimestamp": "1698765432",
  "lastBlockHash": "0xabc123..."
}
```

## Graceful Shutdown

The producer implements graceful shutdown with a 30-second timeout:

```typescript theme={null}
// Signal handlers
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

// All producers are commanded to stop
// Wait up to 30 seconds for clean shutdown
// Force exit if timeout is reached
```

This ensures:

* In-progress block processing completes
* Database state is saved
* No notifications are lost
* Clean disconnection from RabbitMQ and database

## Testing Notifications

Quick test to send a notification to your subscribed account:

```bash theme={null}
# Replace with your Ethereum address
POST_TO_QUEUE_ACCOUNT=0x79063d9173C09887d536924E2F6eADbaBAc099f5 \
  nx test notification-producer \
  --testFile=src/sendPush.test.ts \
  --skip-nx-cache
```

This will:

1. Create a test notification
2. Send it to the RabbitMQ queue
3. Be delivered by connected consumers (e.g., Telegram bot)

## Nx Commands

```bash theme={null}
# Development server
nx start notification-producer

# Build for production
nx build notification-producer

# Run tests
nx test notification-producer

# Lint
nx lint notification-producer

# Docker build
nx docker-build notification-producer
```

## Monitoring and Logs

The producer outputs structured logs with prefixes:

```
[notification-producer:main] Start notification producer for networks: 1, 100
[TradeNotificationProducer:1] Indexing from block 18500000 to 18500100: 101 blocks
[TradeNotificationProducer:1] Sending 5 notifications
```

**Log Levels**:

* `trace` - No new blocks to index
* `debug` - Block processing details
* `info` - Notifications sent, producer lifecycle
* `warn` - RPC lag, temporary issues
* `error` - Critical errors, retries

## Related Services

* [Telegram Bot](/bff/services/telegram) - Consumes notifications and delivers via Telegram
* [API Service](/services/api) - Manages user subscriptions
