unkown commit

This commit is contained in:
m.jalmoudy
2025-12-02 19:39:58 +03:30
parent dfb0658064
commit ed5a490387
27 changed files with 1200 additions and 114 deletions

13
.env.example Normal file
View File

@@ -0,0 +1,13 @@
APP_ENV=production
APP_DEBUG=false
APP_LOG_LEVEL=info
CASSANDRA_HOST=cassandra
CASSANDRA_PORT=9042
CASSANDRA_KEYSPACE=event_store
CASSANDRA_USERNAME=
CASSANDRA_PASSWORD=
# Request limits
MAX_REQUEST_SIZE=1048576
REQUEST_TIMEOUT=30

11
.gitignore vendored
View File

@@ -1 +1,10 @@
docker/cassandra /vendor/
/logs/*.log
.env
.DS_Store
composer.lock
*.cache
/docker/cassandra/data/*
/docker/cassandra/commitlog/*
/docker/cassandra/hints/*
/docker/cassandra/saved_caches/*

166
README.md
View File

@@ -1,3 +1,165 @@
# Distributing Carriers # Distributing Carriers Application
Distributing Carriers Application. A production-ready event-sourced application built with DDD, CQRS, and Event Sourcing patterns using PHP 8+ and Apache Cassandra.
## Features
- **Domain-Driven Design (DDD)**: Clean separation of domain, application, and infrastructure layers
- **Event Sourcing**: All state changes captured as immutable events in Cassandra
- **CQRS**: Command-Query Responsibility Segregation with command bus
- **Production-Ready**: Comprehensive error handling, logging, validation, and monitoring
- **Optimistic Concurrency Control**: Version-based conflict detection
- **Retry Logic**: Automatic retry with exponential backoff for transient failures
- **Structured Logging**: JSON-formatted logs with Monolog
- **Input Validation**: Multi-layer validation with detailed error messages
- **Type Safety**: Strict types and value objects for domain integrity
## Requirements
- PHP 8.0 or higher
- Cassandra PHP extension
- Docker & Docker Compose
- Composer
## Installation
1. Clone the repository
2. Copy environment file:
```bash
cp .env.example .env
```
3. Install dependencies:
```bash
composer install
```
4. Start services:
```bash
docker-compose up -d
```
5. Initialize Cassandra schema:
```bash
docker exec -it cassandra cqlsh -e "
CREATE KEYSPACE IF NOT EXISTS event_store
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE event_store;
CREATE TABLE IF NOT EXISTS events (
aggregate_id text,
version int,
event_type text,
payload text,
created_at timestamp,
PRIMARY KEY (aggregate_id, version)
) WITH CLUSTERING ORDER BY (version ASC);
"
```
## API Endpoints
### Register Carrier
**POST** `/register-carrier`
Request:
```json
{
"name": "FedEx",
"email": "contact@fedex.com"
}
```
Success Response (201):
```json
{
"success": true,
"data": {
"message": "Carrier registered successfully",
"carrier_id": "550e8400-e29b-41d4-a716-446655440000"
}
}
```
Error Response (422):
```json
{
"success": false,
"error": {
"message": "Validation failed",
"code": 422,
"details": {
"email": "The email must be a valid email address"
}
}
}
```
## Architecture
### Layers
- **Domain**: Core business logic, aggregates, events, value objects
- **Application**: Use cases, command handlers, DTOs
- **Infrastructure**: Technical implementations (Cassandra, logging, routing)
### Key Components
- **AggregateRoot**: Base class for domain aggregates with event sourcing
- **CommandBus**: Routes commands to appropriate handlers
- **EventStore**: Persists and retrieves domain events from Cassandra
- **Logger**: Structured logging with rotation and JSON formatting
- **Validator**: Input validation with detailed error messages
## Configuration
Edit `.env` file:
```env
APP_ENV=production
APP_DEBUG=false
APP_LOG_LEVEL=info
CASSANDRA_HOST=cassandra
CASSANDRA_PORT=9042
CASSANDRA_KEYSPACE=event_store
MAX_REQUEST_SIZE=1048576
REQUEST_TIMEOUT=30
```
## Logging
Logs are written to `logs/app.log` in JSON format with automatic rotation (30 days).
Log levels: debug, info, warning, error, critical
## Testing
```bash
composer test
```
## Development
For development mode, set in `.env`:
```env
APP_ENV=development
APP_DEBUG=true
APP_LOG_LEVEL=debug
```
## Production Deployment
1. Set production environment variables
2. Ensure proper Cassandra replication factor
3. Configure log rotation
4. Set up monitoring and alerting
5. Use a reverse proxy (nginx) for SSL termination
6. Enable PHP OPcache
## License
MIT

View File

@@ -4,7 +4,13 @@
"type": "project", "type": "project",
"require": { "require": {
"php": "^8.0", "php": "^8.0",
"ext-cassandra": "*" "ext-cassandra": "*",
"monolog/monolog": "^3.0",
"ramsey/uuid": "^4.7",
"vlucas/phpdotenv": "^5.5"
},
"require-dev": {
"phpunit/phpunit": "^10.0"
}, },
"autoload": { "autoload": {
"psr-4": { "psr-4": {

View File

@@ -1,58 +1,66 @@
version: '3.8'
services: services:
# 🐘 Cassandra Database Container
cassandra: cassandra:
image: cassandra:latest image: cassandra:4.1
container_name: cassandra-db container_name: cassandra
ports: ports:
# Map the CQL native protocol port (required for client connections) - "9042:9042"
- '9042:9042'
volumes:
# Optional: Persist data outside the container
- ./docker/cassandra:/var/lib/cassandra
environment: environment:
# Set the cluster name (optional, but good practice) - CASSANDRA_CLUSTER_NAME=EventStoreCluster
- CASSANDRA_CLUSTER_NAME=MainCluster - CASSANDRA_DC=dc1
# Set the IP address of the node for other nodes to connect to - CASSANDRA_RACK=rack1
- CASSANDRA_BROADCAST_ADDRESS=cassandra-db - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch
# Set the seed provider to itself for a single-node setup - MAX_HEAP_SIZE=512M
- CASSANDRA_SEEDS=cassandra-db - HEAP_NEWSIZE=128M
volumes:
- ./docker/cassandra/data:/var/lib/cassandra
- ./docker/cassandra/commitlog:/var/lib/cassandra/commitlog
- ./docker/cassandra/hints:/var/lib/cassandra/hints
- ./docker/cassandra/saved_caches:/var/lib/cassandra/saved_caches
healthcheck: healthcheck:
test: [ "CMD-SHELL", "cqlsh -e 'describe cluster' || exit 1" ] test: ["CMD-SHELL", "cqlsh -e 'describe cluster'"]
interval: 30s interval: 30s
timeout: 10s timeout: 10s
retries: 5 retries: 5
start_period: 60s networks:
restart: always - app-network
# PHP-FPM Container php:
app:
build: build:
context: ./docker/php context: ./docker/php
dockerfile: Docker dockerfile: Dockerfile
container_name: php-app container_name: php-fpm
# Mount the source code into the container's web root
volumes: volumes:
- ./src:/var/www/html - ./src:/var/www/html/src
# PHP-FPM runs on port 9000 by default - ./vendor:/var/www/html/vendor
expose: - ./logs:/var/www/html/logs
- '9000' - ./composer.json:/var/www/html/composer.json
- ./.env:/var/www/html/.env
environment:
- APP_ENV=${APP_ENV:-production}
- APP_DEBUG=${APP_DEBUG:-false}
- CASSANDRA_HOST=cassandra
- CASSANDRA_PORT=9042
depends_on: depends_on:
cassandra: cassandra:
condition: service_healthy condition: service_healthy
restart: always networks:
- app-network
# Nginx Web Server Container
nginx: nginx:
image: nginx:stable-alpine image: nginx:alpine
container_name: nginx-web container_name: nginx
ports: ports:
# Map host port 10010 to container port 80 - "8080:80"
- '10010:80'
volumes: volumes:
# Mount the source code (same path as PHP-FPM) - ./src:/var/www/html/src
- ./src:/var/www/html
# Override Nginx default configuration
- ./docker/nginx/default.conf:/etc/nginx/conf.d/default.conf - ./docker/nginx/default.conf:/etc/nginx/conf.d/default.conf
depends_on: depends_on:
- app - php
restart: always networks:
- app-network
networks:
app-network:
driver: bridge

24
docker/cassandra/init.cql Normal file
View File

@@ -0,0 +1,24 @@
-- Create keyspace
CREATE KEYSPACE IF NOT EXISTS event_store
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
USE event_store;
-- Create events table
CREATE TABLE IF NOT EXISTS events (
aggregate_id text,
version int,
event_type text,
payload text,
created_at timestamp,
PRIMARY KEY (aggregate_id, version)
) WITH CLUSTERING ORDER BY (version ASC)
AND comment = 'Event store for domain events'
AND compaction = {'class': 'LeveledCompactionStrategy'}
AND gc_grace_seconds = 864000;
-- Create index on event_type for querying
CREATE INDEX IF NOT EXISTS events_event_type_idx ON events (event_type);
-- Create index on created_at for time-based queries
CREATE INDEX IF NOT EXISTS events_created_at_idx ON events (created_at);

View File

@@ -26,7 +26,7 @@ server {
# 3. Block to pass PHP scripts to PHP-FPM # 3. Block to pass PHP scripts to PHP-FPM
location ~ \.php$ { location ~ \.php$ {
# This remains the same to execute any file ending in .php # This remains the same to execute any file ending in .php
fastcgi_pass app:9000; # 'app' is the name of your PHP-FPM service fastcgi_pass php:9000; # 'php' is the name of your PHP-FPM service
fastcgi_index index.php; fastcgi_index index.php;
include fastcgi_params; include fastcgi_params;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;

0
docker/php/Dockerfile Normal file
View File

12
scripts/init-cassandra.sh Normal file
View File

@@ -0,0 +1,12 @@
#!/bin/bash
echo "Waiting for Cassandra to be ready..."
until docker exec cassandra cqlsh -e "describe cluster" > /dev/null 2>&1; do
echo "Cassandra is unavailable - sleeping"
sleep 5
done
echo "Cassandra is up - initializing schema"
docker exec -i cassandra cqlsh < docker/cassandra/init.cql
echo "Cassandra schema initialized successfully"

View File

@@ -5,24 +5,62 @@ namespace DistributingCarriers\Application\Handlers;
use DistributingCarriers\Application\Commands\RegisterCarrierCommand; use DistributingCarriers\Application\Commands\RegisterCarrierCommand;
use DistributingCarriers\Domain\Aggregates\Carrier; use DistributingCarriers\Domain\Aggregates\Carrier;
use DistributingCarriers\Infrastructure\EventSourcing\EventStore; use DistributingCarriers\Infrastructure\EventSourcing\EventStore;
use DistributingCarriers\Infrastructure\Logging\Logger;
use Ramsey\Uuid\Uuid;
class RegisterCarrierHandler class RegisterCarrierHandler
{ {
private $eventStore; private EventStore $eventStore;
public function __construct(EventStore $eventStore) public function __construct(EventStore $eventStore)
{ {
$this->eventStore = $eventStore; $this->eventStore = $eventStore;
} }
public function __invoke(RegisterCarrierCommand $command): void public function __invoke(RegisterCarrierCommand $command): array
{ {
$carrierId = uniqid('carrier_'); $logger = Logger::getInstance();
$carrier = Carrier::register($carrierId, $command->name, $command->email);
$this->eventStore->save($carrier->getId(), $carrier->getUncommittedChanges(), $carrier->getVersion()); try {
$carrier->markChangesAsCommitted(); // Generate proper UUID for carrier
$carrierId = Uuid::uuid4()->toString();
echo "Carrier registered with ID: $carrierId\n"; $logger->debug('Creating carrier aggregate', [
'carrier_id' => $carrierId,
'name' => $command->name
]);
// Create carrier aggregate
$carrier = Carrier::register($carrierId, $command->name, $command->email);
// Persist events
$this->eventStore->save(
$carrier->getId(),
$carrier->getUncommittedChanges(),
$carrier->getVersion()
);
$carrier->markChangesAsCommitted();
$logger->info('Carrier registered successfully', [
'carrier_id' => $carrierId,
'name' => $command->name,
'email' => $command->email
]);
return [
'carrier_id' => $carrierId,
'name' => $command->name,
'email' => $command->email
];
} catch (\Exception $e) {
$logger->error('Failed to register carrier', [
'error' => $e->getMessage(),
'name' => $command->name,
'email' => $command->email
]);
throw $e;
}
} }
} }

View File

@@ -0,0 +1,63 @@
<?php
declare(strict_types=1);
namespace DistributingCarriers\Application\Http;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
use DistributingCarriers\Infrastructure\Http\JsonResponse;
use DistributingCarriers\Infrastructure\EventSourcing\EventStore;
use DistributingCarriers\Infrastructure\Logging\Logger;
use DistributingCarriers\Domain\Aggregates\Carrier;
class GetCarrierHandler implements IRequestHandler
{
private EventStore $eventStore;
private string $carrierId;
public function __construct(EventStore $eventStore, string $carrierId)
{
$this->eventStore = $eventStore;
$this->carrierId = $carrierId;
}
public function handle(): void
{
$logger = Logger::getInstance();
try {
$logger->debug('Fetching carrier', ['carrier_id' => $this->carrierId]);
$events = $this->eventStore->getEventsForAggregate($this->carrierId);
if (empty($events)) {
$logger->info('Carrier not found', ['carrier_id' => $this->carrierId]);
JsonResponse::error('Carrier not found', 404);
return;
}
$carrier = new Carrier();
$carrier->loadFromHistory($events);
$reflection = new \ReflectionClass($carrier);
$nameProperty = $reflection->getProperty('name');
$nameProperty->setAccessible(true);
$emailProperty = $reflection->getProperty('email');
$emailProperty->setAccessible(true);
JsonResponse::success([
'carrier_id' => $carrier->getId(),
'name' => $nameProperty->getValue($carrier),
'email' => $emailProperty->getValue($carrier),
'version' => $carrier->getVersion()
]);
} catch (\Exception $e) {
$logger->error('Failed to fetch carrier', [
'carrier_id' => $this->carrierId,
'error' => $e->getMessage()
]);
JsonResponse::error('Failed to fetch carrier', 500);
}
}
}

View File

@@ -0,0 +1,45 @@
<?php
declare(strict_types=1);
namespace DistributingCarriers\Application\Http;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
use DistributingCarriers\Infrastructure\Http\JsonResponse;
class HealthCheckHandler implements IRequestHandler
{
private $session;
public function __construct($session = null)
{
$this->session = $session;
}
public function handle(): void
{
$health = [
'status' => 'healthy',
'timestamp' => date('c'),
'checks' => []
];
// Check Cassandra connection
if ($this->session !== null) {
try {
$this->session->execute(new \Cassandra\SimpleStatement('SELECT now() FROM system.local'));
$health['checks']['cassandra'] = 'healthy';
} catch (\Exception $e) {
$health['checks']['cassandra'] = 'unhealthy';
$health['status'] = 'degraded';
}
}
// Check logs directory
$logsDir = __DIR__ . '/../../../logs';
$health['checks']['logs'] = is_writable($logsDir) ? 'healthy' : 'unhealthy';
$statusCode = $health['status'] === 'healthy' ? 200 : 503;
JsonResponse::success($health, $statusCode);
}
}

View File

@@ -0,0 +1,103 @@
<?php
namespace DistributingCarriers\Application\Http;
use DistributingCarriers\Application\Commands\RegisterCarrierCommand;
use DistributingCarriers\Infrastructure\Messaging\CommandBus;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
use DistributingCarriers\Infrastructure\Http\JsonResponse;
use DistributingCarriers\Infrastructure\Validation\Validator;
use DistributingCarriers\Infrastructure\Exceptions\ValidationException;
use DistributingCarriers\Infrastructure\Logging\Logger;
class RegisterCarrierRequestHandler implements IRequestHandler
{
private CommandBus $commandBus;
public function __construct(CommandBus $commandBus)
{
$this->commandBus = $commandBus;
}
public function handle(): void
{
$logger = Logger::getInstance();
try {
// Read and validate request size
$maxSize = (int)(getenv('MAX_REQUEST_SIZE') ?: 1048576); // 1MB default
$contentLength = (int)($_SERVER['CONTENT_LENGTH'] ?? 0);
if ($contentLength > $maxSize) {
$logger->warning('Request payload too large', ['size' => $contentLength]);
JsonResponse::error('Request payload too large', 413);
return;
}
// Read raw input
$rawInput = file_get_contents('php://input');
if ($rawInput === false || $rawInput === '') {
$logger->warning('Empty request body received');
JsonResponse::error('Request body is required', 400);
return;
}
// Parse JSON
$input = json_decode($rawInput, true);
if (json_last_error() !== JSON_ERROR_NONE) {
$logger->warning('Invalid JSON received', ['error' => json_last_error_msg()]);
JsonResponse::error('Invalid JSON: ' . json_last_error_msg(), 400);
return;
}
// Validate input
$validator = new Validator();
$validator
->validateRequired($input, ['name', 'email'])
->validateEmail($input, 'email')
->validateMinLength($input, 'name', 2)
->validateMaxLength($input, 'name', 255)
->validateMaxLength($input, 'email', 255);
if ($validator->fails()) {
$logger->info('Validation failed', ['errors' => $validator->getErrors()]);
JsonResponse::error('Validation failed', 422, $validator->getErrors());
return;
}
// Sanitize input
$name = trim($input['name']);
$email = strtolower(trim($input['email']));
$logger->info('Registering carrier', ['name' => $name, 'email' => $email]);
// Dispatch command
$command = new RegisterCarrierCommand($name, $email);
$result = $this->commandBus->dispatch($command);
$logger->info('Carrier registered successfully', ['carrier_id' => $result['carrier_id'] ?? null]);
JsonResponse::success([
'message' => 'Carrier registered successfully',
'carrier_id' => $result['carrier_id'] ?? null
], 201);
} catch (ValidationException $e) {
$logger->warning('Validation exception', ['errors' => $e->getErrors()]);
JsonResponse::error($e->getMessage(), 422, $e->getErrors());
} catch (\Exception $e) {
$logger->error('Unexpected error during carrier registration', [
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
if (getenv('APP_DEBUG') === 'true') {
JsonResponse::error('Internal server error: ' . $e->getMessage(), 500);
} else {
JsonResponse::error('An unexpected error occurred', 500);
}
}
}
}

View File

@@ -0,0 +1,40 @@
<?php
namespace DistributingCarriers\Domain\ValueObjects;
use DistributingCarriers\Infrastructure\Exceptions\DomainException;
final class CarrierName
{
private string $value;
public function __construct(string $name)
{
$name = trim($name);
if (strlen($name) < 2) {
throw new DomainException("Carrier name must be at least 2 characters");
}
if (strlen($name) > 255) {
throw new DomainException("Carrier name must not exceed 255 characters");
}
$this->value = $name;
}
public function getValue(): string
{
return $this->value;
}
public function __toString(): string
{
return $this->value;
}
public function equals(CarrierName $other): bool
{
return $this->value === $other->value;
}
}

View File

@@ -0,0 +1,36 @@
<?php
namespace DistributingCarriers\Domain\ValueObjects;
use DistributingCarriers\Infrastructure\Exceptions\DomainException;
final class Email
{
private string $value;
public function __construct(string $email)
{
$email = strtolower(trim($email));
if (!filter_var($email, FILTER_VALIDATE_EMAIL)) {
throw new DomainException("Invalid email address: {$email}");
}
$this->value = $email;
}
public function getValue(): string
{
return $this->value;
}
public function __toString(): string
{
return $this->value;
}
public function equals(Email $other): bool
{
return $this->value === $other->value;
}
}

View File

@@ -3,69 +3,167 @@
namespace DistributingCarriers\Infrastructure\EventSourcing; namespace DistributingCarriers\Infrastructure\EventSourcing;
use Cassandra; use Cassandra;
use DistributingCarriers\Infrastructure\Exceptions\ConcurrencyException;
use DistributingCarriers\Infrastructure\Logging\Logger;
class CassandraEventStore implements EventStore class CassandraEventStore implements EventStore
{ {
private $session; private Cassandra\Session $session;
private int $maxRetries = 3;
public function __construct($session) public function __construct(Cassandra\Session $session)
{ {
$this->session = $session; $this->session = $session;
} }
public function save(string $aggregateId, array $events, int $expectedVersion): void public function save(string $aggregateId, array $events, int $expectedVersion): void
{ {
// In a real implementation, we would check the version for concurrency control. $logger = Logger::getInstance();
// For this example, we'll just append.
$statement = $this->session->prepare( if (empty($events)) {
"INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)" $logger->warning('Attempted to save empty events array', ['aggregate_id' => $aggregateId]);
); return;
}
foreach ($events as $event) { try {
$version = ++$expectedVersion; // Check current version for optimistic concurrency control
$payload = json_encode($event); $currentVersion = $this->getCurrentVersion($aggregateId);
$eventType = get_class($event);
$createdAt = new Cassandra\Timestamp();
$this->session->execute($statement, [ if ($currentVersion !== $expectedVersion) {
'arguments' => [ throw new ConcurrencyException(
"Concurrency conflict for aggregate {$aggregateId}. " .
"Expected version {$expectedVersion}, but current version is {$currentVersion}"
);
}
$statement = $this->session->prepare(
"INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)"
);
$batch = new Cassandra\BatchStatement(Cassandra::BATCH_LOGGED);
foreach ($events as $event) {
$version = ++$expectedVersion;
$eventType = \get_class($event);
$payload = json_encode($event, JSON_THROW_ON_ERROR);
$createdAt = new Cassandra\Timestamp();
$batch->add($statement, [
$aggregateId, $aggregateId,
$version, $version,
$eventType, $eventType,
$payload, $payload,
$createdAt $createdAt
] ]);
$logger->debug('Event added to batch', [
'aggregate_id' => $aggregateId,
'version' => $version,
'event_type' => $eventType
]);
}
// Execute batch with retry logic
$this->executeWithRetry($batch);
$logger->info('Events saved successfully', [
'aggregate_id' => $aggregateId,
'event_count' => count($events),
'final_version' => $expectedVersion
]); ]);
} catch (ConcurrencyException $e) {
$logger->warning('Concurrency conflict detected', [
'aggregate_id' => $aggregateId,
'expected_version' => $expectedVersion
]);
throw $e;
} catch (\Exception $e) {
$logger->error('Failed to save events', [
'aggregate_id' => $aggregateId,
'error' => $e->getMessage()
]);
throw new \RuntimeException("Failed to save events: " . $e->getMessage(), 0, $e);
} }
} }
public function getEventsForAggregate(string $aggregateId): array public function getEventsForAggregate(string $aggregateId): array
{
$logger = Logger::getInstance();
try {
$statement = $this->session->prepare(
"SELECT event_type, payload, version, created_at FROM events WHERE aggregate_id = ? ORDER BY version ASC"
);
$result = $this->session->execute($statement, ['arguments' => [$aggregateId]]);
$events = [];
foreach ($result as $row) {
$eventType = $row['event_type'];
$payload = json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR);
if (class_exists($eventType)) {
$events[] = new $eventType($payload);
} else {
$logger->warning('Event class not found', [
'event_type' => $eventType,
'aggregate_id' => $aggregateId
]);
}
}
$logger->debug('Events loaded for aggregate', [
'aggregate_id' => $aggregateId,
'event_count' => count($events)
]);
return $events;
} catch (\Exception $e) {
$logger->error('Failed to load events', [
'aggregate_id' => $aggregateId,
'error' => $e->getMessage()
]);
throw new \RuntimeException("Failed to load events: " . $e->getMessage(), 0, $e);
}
}
private function getCurrentVersion(string $aggregateId): int
{ {
$statement = $this->session->prepare( $statement = $this->session->prepare(
"SELECT event_type, payload FROM events WHERE aggregate_id = ?" "SELECT MAX(version) as max_version FROM events WHERE aggregate_id = ?"
); );
$result = $this->session->execute($statement, ['arguments' => [$aggregateId]]); $result = $this->session->execute($statement, ['arguments' => [$aggregateId]]);
$row = $result->first();
$events = []; return $row['max_version'] ?? 0;
foreach ($result as $row) { }
$eventType = $row['event_type'];
$payload = json_decode($row['payload'], true);
// Assuming we can reconstruct the event object from payload private function executeWithRetry(Cassandra\BatchStatement $batch): void
// This is a simplified version. {
if (class_exists($eventType)) { $attempt = 0;
// In a real app, you might use a serializer/deserializer $lastException = null;
// Here we assume the event has a static fromArray or similar, or just public properties
// For simplicity, let's assume we just return the data or a generic object if class doesn't handle it while ($attempt < $this->maxRetries) {
// But for the plan, let's try to instantiate. try {
// We'll leave it as an array or basic object for now if complex reconstruction is needed. $this->session->execute($batch);
// Let's assume the event class has a constructor that takes the payload. return;
$events[] = new $eventType($payload); } catch (Cassandra\Exception\WriteTimeoutException $e) {
$lastException = $e;
$attempt++;
if ($attempt < $this->maxRetries) {
usleep(100000 * $attempt); // Exponential backoff: 100ms, 200ms, 300ms
}
} }
} }
return $events; throw new \RuntimeException(
"Failed to execute batch after {$this->maxRetries} attempts",
0,
$lastException
);
} }
} }

View File

@@ -0,0 +1,7 @@
<?php
namespace DistributingCarriers\Infrastructure\Exceptions;
class ConcurrencyException extends \Exception
{
}

View File

@@ -0,0 +1,7 @@
<?php
namespace DistributingCarriers\Infrastructure\Exceptions;
class DomainException extends \Exception
{
}

View File

@@ -0,0 +1,19 @@
<?php
namespace DistributingCarriers\Infrastructure\Exceptions;
class ValidationException extends \Exception
{
private array $errors;
public function __construct(array $errors, string $message = 'Validation failed')
{
parent::__construct($message);
$this->errors = $errors;
}
public function getErrors(): array
{
return $this->errors;
}
}

View File

@@ -0,0 +1,35 @@
<?php
namespace DistributingCarriers\Infrastructure\Http;
class JsonResponse
{
public static function success(array $data, int $statusCode = 200): void
{
self::send(['success' => true, 'data' => $data], $statusCode);
}
public static function error(string $message, int $statusCode = 400, ?array $details = null): void
{
$response = [
'success' => false,
'error' => [
'message' => $message,
'code' => $statusCode
]
];
if ($details !== null) {
$response['error']['details'] = $details;
}
self::send($response, $statusCode);
}
private static function send(array $data, int $statusCode): void
{
http_response_code($statusCode);
header('Content-Type: application/json; charset=utf-8');
echo json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
}
}

View File

@@ -0,0 +1,38 @@
<?php
namespace DistributingCarriers\Infrastructure\Logging;
use Monolog\Logger as MonologLogger;
use Monolog\Handler\StreamHandler;
use Monolog\Handler\RotatingFileHandler;
use Monolog\Formatter\JsonFormatter;
class Logger
{
private static ?MonologLogger $instance = null;
public static function getInstance(): MonologLogger
{
if (self::$instance === null) {
self::$instance = new MonologLogger('app');
$logLevel = match(getenv('APP_LOG_LEVEL') ?: 'info') {
'debug' => MonologLogger::DEBUG,
'info' => MonologLogger::INFO,
'warning' => MonologLogger::WARNING,
'error' => MonologLogger::ERROR,
default => MonologLogger::INFO,
};
$handler = new RotatingFileHandler(__DIR__ . '/../../../logs/app.log', 30, $logLevel);
$handler->setFormatter(new JsonFormatter());
self::$instance->pushHandler($handler);
if (getenv('APP_ENV') === 'development') {
self::$instance->pushHandler(new StreamHandler('php://stdout', MonologLogger::DEBUG));
}
}
return self::$instance;
}
}

View File

@@ -2,23 +2,42 @@
namespace DistributingCarriers\Infrastructure\Messaging; namespace DistributingCarriers\Infrastructure\Messaging;
use DistributingCarriers\Infrastructure\Logging\Logger;
class CommandBus class CommandBus
{ {
private $handlers = []; private array $handlers = [];
public function register(string $commandClass, callable $handler): void public function register(string $commandClass, callable $handler): void
{ {
$this->handlers[$commandClass] = $handler; $this->handlers[$commandClass] = $handler;
} }
public function dispatch(object $command): void public function dispatch(object $command): mixed
{ {
$commandClass = \get_class($command); $commandClass = \get_class($command);
$logger = Logger::getInstance();
if (!isset($this->handlers[$commandClass])) { if (!isset($this->handlers[$commandClass])) {
$logger->error('No handler registered for command', ['command' => $commandClass]);
throw new \Exception("No handler registered for command: $commandClass"); throw new \Exception("No handler registered for command: $commandClass");
} }
$handler = $this->handlers[$commandClass]; $logger->debug('Dispatching command', ['command' => $commandClass]);
$handler($command);
try {
$handler = $this->handlers[$commandClass];
$result = $handler($command);
$logger->debug('Command dispatched successfully', ['command' => $commandClass]);
return $result;
} catch (\Exception $e) {
$logger->error('Command dispatch failed', [
'command' => $commandClass,
'error' => $e->getMessage()
]);
throw $e;
}
} }
} }

View File

@@ -0,0 +1,28 @@
<?php
declare(strict_types=1);
namespace DistributingCarriers\Infrastructure\Middleware;
use DistributingCarriers\Infrastructure\Routing\IMiddleware;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
class CorsMiddleware implements IMiddleware
{
public function process(IRequestHandler $handler): void
{
$origin = $_SERVER['HTTP_ORIGIN'] ?? '*';
header("Access-Control-Allow-Origin: $origin");
header('Access-Control-Allow-Methods: GET, POST, PUT, DELETE, OPTIONS');
header('Access-Control-Allow-Headers: Content-Type, Authorization, X-Request-ID');
header('Access-Control-Max-Age: 86400');
if ($_SERVER['REQUEST_METHOD'] === 'OPTIONS') {
http_response_code(204);
exit;
}
$handler->handle();
}
}

View File

@@ -0,0 +1,71 @@
<?php
declare(strict_types=1);
namespace DistributingCarriers\Infrastructure\Middleware;
use DistributingCarriers\Infrastructure\Routing\IMiddleware;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
use DistributingCarriers\Infrastructure\Http\JsonResponse;
use DistributingCarriers\Infrastructure\Logging\Logger;
class RateLimitMiddleware implements IMiddleware
{
private int $maxRequests;
private int $windowSeconds;
private string $storageDir;
public function __construct(int $maxRequests = 100, int $windowSeconds = 60)
{
$this->maxRequests = $maxRequests;
$this->windowSeconds = $windowSeconds;
$this->storageDir = sys_get_temp_dir() . '/rate_limits';
if (!is_dir($this->storageDir)) {
mkdir($this->storageDir, 0777, true);
}
}
public function process(IRequestHandler $handler): void
{
$clientIp = $_SERVER['REMOTE_ADDR'] ?? 'unknown';
$key = md5($clientIp);
$file = $this->storageDir . '/' . $key;
$now = time();
$requests = $this->getRequests($file, $now);
if (\count($requests) >= $this->maxRequests) {
$logger = Logger::getInstance();
$logger->warning('Rate limit exceeded', ['ip' => $clientIp]);
header('X-RateLimit-Limit: ' . $this->maxRequests);
header('X-RateLimit-Remaining: 0');
header('X-RateLimit-Reset: ' . ($now + $this->windowSeconds));
JsonResponse::error('Rate limit exceeded', 429);
exit;
}
$requests[] = $now;
file_put_contents($file, json_encode($requests));
header('X-RateLimit-Limit: ' . $this->maxRequests);
header('X-RateLimit-Remaining: ' . ($this->maxRequests - \count($requests)));
header('X-RateLimit-Reset: ' . ($now + $this->windowSeconds));
$handler->handle();
}
private function getRequests(string $file, int $now): array
{
if (!file_exists($file)) {
return [];
}
$content = file_get_contents($file);
$requests = json_decode($content, true) ?: [];
return array_filter($requests, fn($timestamp) => $timestamp > ($now - $this->windowSeconds));
}
}

View File

@@ -0,0 +1,23 @@
<?php
declare(strict_types=1);
namespace DistributingCarriers\Infrastructure\Middleware;
use DistributingCarriers\Infrastructure\Routing\IMiddleware;
use DistributingCarriers\Infrastructure\Routing\IRequestHandler;
use Ramsey\Uuid\Uuid;
class RequestIdMiddleware implements IMiddleware
{
public function process(IRequestHandler $handler): void
{
$requestId = $_SERVER['HTTP_X_REQUEST_ID'] ?? Uuid::uuid4()->toString();
header("X-Request-ID: $requestId");
$_SERVER['REQUEST_ID'] = $requestId;
$handler->handle();
}
}

View File

@@ -0,0 +1,52 @@
<?php
namespace DistributingCarriers\Infrastructure\Validation;
class Validator
{
private array $errors = [];
public function validateRequired(array $data, array $requiredFields): self
{
foreach ($requiredFields as $field) {
if (!isset($data[$field]) || trim($data[$field]) === '') {
$this->errors[$field] = "The {$field} field is required";
}
}
return $this;
}
public function validateEmail(array $data, string $field): self
{
if (isset($data[$field]) && !filter_var($data[$field], FILTER_VALIDATE_EMAIL)) {
$this->errors[$field] = "The {$field} must be a valid email address";
}
return $this;
}
public function validateMaxLength(array $data, string $field, int $maxLength): self
{
if (isset($data[$field]) && strlen($data[$field]) > $maxLength) {
$this->errors[$field] = "The {$field} must not exceed {$maxLength} characters";
}
return $this;
}
public function validateMinLength(array $data, string $field, int $minLength): self
{
if (isset($data[$field]) && strlen($data[$field]) < $minLength) {
$this->errors[$field] = "The {$field} must be at least {$minLength} characters";
}
return $this;
}
public function fails(): bool
{
return !empty($this->errors);
}
public function getErrors(): array
{
return $this->errors;
}
}

View File

@@ -1,5 +1,7 @@
<?php <?php
declare(strict_types=1);
require_once __DIR__ . '/../vendor/autoload.php'; require_once __DIR__ . '/../vendor/autoload.php';
use DistributingCarriers\Infrastructure\Messaging\CommandBus; use DistributingCarriers\Infrastructure\Messaging\CommandBus;
@@ -7,42 +9,170 @@ use DistributingCarriers\Infrastructure\Routing\Router;
use DistributingCarriers\Infrastructure\EventSourcing\CassandraEventStore; use DistributingCarriers\Infrastructure\EventSourcing\CassandraEventStore;
use DistributingCarriers\Application\Commands\RegisterCarrierCommand; use DistributingCarriers\Application\Commands\RegisterCarrierCommand;
use DistributingCarriers\Application\Handlers\RegisterCarrierHandler; use DistributingCarriers\Application\Handlers\RegisterCarrierHandler;
use DistributingCarriers\Application\Http\RegisterCarrierRequestHandler;
use DistributingCarriers\Application\Http\HealthCheckHandler;
use DistributingCarriers\Application\Http\GetCarrierHandler;
use DistributingCarriers\Infrastructure\Logging\Logger;
use DistributingCarriers\Infrastructure\Http\JsonResponse;
use DistributingCarriers\Infrastructure\Middleware\CorsMiddleware;
use DistributingCarriers\Infrastructure\Middleware\RequestIdMiddleware;
use DistributingCarriers\Infrastructure\Middleware\RateLimitMiddleware;
use Dotenv\Dotenv;
// Initialize Cassandra Connection // Load environment variables
$cluster = Cassandra::cluster() if (file_exists(__DIR__ . '/../.env')) {
->withContactPoints(getenv('CASSANDRA_HOST') ?: 'cassandra') $dotenv = Dotenv::createImmutable(__DIR__ . '/..');
->withPort((int)(getenv('CASSANDRA_PORT') ?: 9042)) $dotenv->load();
->build(); }
$session = $cluster->connect('event_store'); // Set error handling
error_reporting(E_ALL);
ini_set('display_errors', getenv('APP_DEBUG') === 'true' ? '1' : '0');
ini_set('log_errors', '1');
ini_set('error_log', __DIR__ . '/../logs/php_errors.log');
// Initialize Infrastructure // Set default timezone
$eventStore = new CassandraEventStore($session); date_default_timezone_set('UTC');
$commandBus = new CommandBus();
// Register Handlers // Global exception handler
$commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore)); set_exception_handler(function (Throwable $e) {
$logger = Logger::getInstance();
$logger->error('Uncaught exception', [
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine(),
'trace' => $e->getTraceAsString()
]);
// Router if (getenv('APP_DEBUG') === 'true') {
$router = new Router(); JsonResponse::error('Internal server error: ' . $e->getMessage(), 500);
} else {
$router->add('POST', '/register-carrier', function () use ($commandBus) { JsonResponse::error('An unexpected error occurred', 500);
$input = json_decode(file_get_contents('php://input'), true);
if (!isset($input['name']) || !isset($input['email'])) {
http_response_code(400);
echo json_encode(['error' => 'Missing required fields: name, email']);
return;
} }
$command = new RegisterCarrierCommand($input['name'], $input['email']);
$commandBus->dispatch($command);
http_response_code(201);
echo json_encode(['message' => 'Carrier registered successfully']);
}); });
$method = $_SERVER['REQUEST_METHOD'] ?? 'GET'; // Global error handler
$uri = parse_url($_SERVER['REQUEST_URI'] ?? '/', PHP_URL_PATH); set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) {
if (!(error_reporting() & $errno)) {
return false;
}
$router->dispatch($method, $uri); $logger = Logger::getInstance();
$logger->error('PHP Error', [
'errno' => $errno,
'errstr' => $errstr,
'errfile' => $errfile,
'errline' => $errline
]);
throw new ErrorException($errstr, 0, $errno, $errfile, $errline);
});
try {
$logger = Logger::getInstance();
$logger->info('Application starting', [
'method' => $_SERVER['REQUEST_METHOD'] ?? 'UNKNOWN',
'uri' => $_SERVER['REQUEST_URI'] ?? '/',
'ip' => $_SERVER['REMOTE_ADDR'] ?? 'unknown'
]);
// Initialize Cassandra Connection with retry logic
$maxRetries = 3;
$retryDelay = 1000000; // 1 second
$cluster = null;
$session = null;
for ($attempt = 1; $attempt <= $maxRetries; $attempt++) {
try {
$clusterBuilder = Cassandra::cluster()
->withContactPoints(getenv('CASSANDRA_HOST') ?: 'cassandra')
->withPort((int)(getenv('CASSANDRA_PORT') ?: 9042))
->withConnectTimeout(10)
->withRequestTimeout(10);
// Add credentials if provided
$username = getenv('CASSANDRA_USERNAME');
$password = getenv('CASSANDRA_PASSWORD');
if ($username && $password) {
$clusterBuilder->withCredentials($username, $password);
}
$cluster = $clusterBuilder->build();
$keyspace = getenv('CASSANDRA_KEYSPACE') ?: 'event_store';
$session = $cluster->connect($keyspace);
$logger->info('Connected to Cassandra', [
'keyspace' => $keyspace,
'attempt' => $attempt
]);
break;
} catch (Exception $e) {
$logger->warning('Failed to connect to Cassandra', [
'attempt' => $attempt,
'max_retries' => $maxRetries,
'error' => $e->getMessage()
]);
if ($attempt === $maxRetries) {
throw new RuntimeException('Could not connect to Cassandra after ' . $maxRetries . ' attempts', 0, $e);
}
usleep($retryDelay * $attempt);
}
}
// Initialize Infrastructure
$eventStore = new CassandraEventStore($session);
$commandBus = new CommandBus();
// Register Handlers
$commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore));
// Router
$router = new Router();
// Add global middlewares
$router->addGlobalMiddleware(new RequestIdMiddleware());
$router->addGlobalMiddleware(new CorsMiddleware());
// Add routes
$router->add('GET', '/health', new HealthCheckHandler($session));
$router->add('POST', '/register-carrier', new RegisterCarrierRequestHandler($commandBus), [
new RateLimitMiddleware(100, 60)
]);
// Add GET carrier route with dynamic ID
$method = $_SERVER['REQUEST_METHOD'] ?? 'GET';
$uri = parse_url($_SERVER['REQUEST_URI'] ?? '/', PHP_URL_PATH);
// Handle GET /carriers/{id}
if ($method === 'GET' && preg_match('#^/carriers/([a-f0-9\-]{36})$#', $uri, $matches)) {
$carrierId = $matches[1];
$handler = new GetCarrierHandler($eventStore, $carrierId);
$router->add('GET', $uri, $handler);
}
$logger->debug('Dispatching request', [
'method' => $method,
'uri' => $uri,
'request_id' => $_SERVER['REQUEST_ID'] ?? 'unknown'
]);
$router->dispatch($method, $uri);
$logger->info('Request completed successfully');
} catch (Throwable $e) {
$logger = Logger::getInstance();
$logger->critical('Application failed to start', [
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString()
]);
if (getenv('APP_DEBUG') === 'true') {
JsonResponse::error('Application initialization failed: ' . $e->getMessage(), 500);
} else {
JsonResponse::error('Service temporarily unavailable', 503);
}
}