diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..40e8606 --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +APP_ENV=production +APP_DEBUG=false +APP_LOG_LEVEL=info + +CASSANDRA_HOST=cassandra +CASSANDRA_PORT=9042 +CASSANDRA_KEYSPACE=event_store +CASSANDRA_USERNAME= +CASSANDRA_PASSWORD= + +# Request limits +MAX_REQUEST_SIZE=1048576 +REQUEST_TIMEOUT=30 diff --git a/.gitignore b/.gitignore index ff5efa3..52a6a3b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,10 @@ -docker/cassandra \ No newline at end of file +/vendor/ +/logs/*.log +.env +.DS_Store +composer.lock +*.cache +/docker/cassandra/data/* +/docker/cassandra/commitlog/* +/docker/cassandra/hints/* +/docker/cassandra/saved_caches/* diff --git a/README.md b/README.md index 233fa84..0cbc217 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,165 @@ -# Distributing Carriers +# Distributing Carriers Application -Distributing Carriers Application. +A production-ready event-sourced application built with DDD, CQRS, and Event Sourcing patterns using PHP 8+ and Apache Cassandra. + +## Features + +- **Domain-Driven Design (DDD)**: Clean separation of domain, application, and infrastructure layers +- **Event Sourcing**: All state changes captured as immutable events in Cassandra +- **CQRS**: Command-Query Responsibility Segregation with command bus +- **Production-Ready**: Comprehensive error handling, logging, validation, and monitoring +- **Optimistic Concurrency Control**: Version-based conflict detection +- **Retry Logic**: Automatic retry with exponential backoff for transient failures +- **Structured Logging**: JSON-formatted logs with Monolog +- **Input Validation**: Multi-layer validation with detailed error messages +- **Type Safety**: Strict types and value objects for domain integrity + +## Requirements + +- PHP 8.0 or higher +- Cassandra PHP extension +- Docker & Docker Compose +- Composer + +## Installation + +1. Clone the repository +2. Copy environment file: + ```bash + cp .env.example .env + ``` + +3. Install dependencies: + ```bash + composer install + ``` + +4. Start services: + ```bash + docker-compose up -d + ``` + +5. Initialize Cassandra schema: + ```bash + docker exec -it cassandra cqlsh -e " + CREATE KEYSPACE IF NOT EXISTS event_store + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + + USE event_store; + + CREATE TABLE IF NOT EXISTS events ( + aggregate_id text, + version int, + event_type text, + payload text, + created_at timestamp, + PRIMARY KEY (aggregate_id, version) + ) WITH CLUSTERING ORDER BY (version ASC); + " + ``` + +## API Endpoints + +### Register Carrier + +**POST** `/register-carrier` + +Request: +```json +{ + "name": "FedEx", + "email": "contact@fedex.com" +} +``` + +Success Response (201): +```json +{ + "success": true, + "data": { + "message": "Carrier registered successfully", + "carrier_id": "550e8400-e29b-41d4-a716-446655440000" + } +} +``` + +Error Response (422): +```json +{ + "success": false, + "error": { + "message": "Validation failed", + "code": 422, + "details": { + "email": "The email must be a valid email address" + } + } +} +``` + +## Architecture + +### Layers + +- **Domain**: Core business logic, aggregates, events, value objects +- **Application**: Use cases, command handlers, DTOs +- **Infrastructure**: Technical implementations (Cassandra, logging, routing) + +### Key Components + +- **AggregateRoot**: Base class for domain aggregates with event sourcing +- **CommandBus**: Routes commands to appropriate handlers +- **EventStore**: Persists and retrieves domain events from Cassandra +- **Logger**: Structured logging with rotation and JSON formatting +- **Validator**: Input validation with detailed error messages + +## Configuration + +Edit `.env` file: + +```env +APP_ENV=production +APP_DEBUG=false +APP_LOG_LEVEL=info + +CASSANDRA_HOST=cassandra +CASSANDRA_PORT=9042 +CASSANDRA_KEYSPACE=event_store + +MAX_REQUEST_SIZE=1048576 +REQUEST_TIMEOUT=30 +``` + +## Logging + +Logs are written to `logs/app.log` in JSON format with automatic rotation (30 days). + +Log levels: debug, info, warning, error, critical + +## Testing + +```bash +composer test +``` + +## Development + +For development mode, set in `.env`: +```env +APP_ENV=development +APP_DEBUG=true +APP_LOG_LEVEL=debug +``` + +## Production Deployment + +1. Set production environment variables +2. Ensure proper Cassandra replication factor +3. Configure log rotation +4. Set up monitoring and alerting +5. Use a reverse proxy (nginx) for SSL termination +6. Enable PHP OPcache + +## License + +MIT diff --git a/composer.json b/composer.json index 7b8fc8e..46e33e3 100644 --- a/composer.json +++ b/composer.json @@ -4,7 +4,13 @@ "type": "project", "require": { "php": "^8.0", - "ext-cassandra": "*" + "ext-cassandra": "*", + "monolog/monolog": "^3.0", + "ramsey/uuid": "^4.7", + "vlucas/phpdotenv": "^5.5" + }, + "require-dev": { + "phpunit/phpunit": "^10.0" }, "autoload": { "psr-4": { diff --git a/docker-compose.yml b/docker-compose.yml index a09c43c..2912c9e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,58 +1,66 @@ +version: '3.8' + services: - # 🐘 Cassandra Database Container cassandra: - image: cassandra:latest - container_name: cassandra-db + image: cassandra:4.1 + container_name: cassandra ports: - # Map the CQL native protocol port (required for client connections) - - '9042:9042' - volumes: - # Optional: Persist data outside the container - - ./docker/cassandra:/var/lib/cassandra + - "9042:9042" environment: - # Set the cluster name (optional, but good practice) - - CASSANDRA_CLUSTER_NAME=MainCluster - # Set the IP address of the node for other nodes to connect to - - CASSANDRA_BROADCAST_ADDRESS=cassandra-db - # Set the seed provider to itself for a single-node setup - - CASSANDRA_SEEDS=cassandra-db + - CASSANDRA_CLUSTER_NAME=EventStoreCluster + - CASSANDRA_DC=dc1 + - CASSANDRA_RACK=rack1 + - CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch + - MAX_HEAP_SIZE=512M + - HEAP_NEWSIZE=128M + volumes: + - ./docker/cassandra/data:/var/lib/cassandra + - ./docker/cassandra/commitlog:/var/lib/cassandra/commitlog + - ./docker/cassandra/hints:/var/lib/cassandra/hints + - ./docker/cassandra/saved_caches:/var/lib/cassandra/saved_caches healthcheck: - test: [ "CMD-SHELL", "cqlsh -e 'describe cluster' || exit 1" ] + test: ["CMD-SHELL", "cqlsh -e 'describe cluster'"] interval: 30s timeout: 10s retries: 5 - start_period: 60s - restart: always + networks: + - app-network - # PHP-FPM Container - app: + php: build: context: ./docker/php - dockerfile: Docker - container_name: php-app - # Mount the source code into the container's web root + dockerfile: Dockerfile + container_name: php-fpm volumes: - - ./src:/var/www/html - # PHP-FPM runs on port 9000 by default - expose: - - '9000' + - ./src:/var/www/html/src + - ./vendor:/var/www/html/vendor + - ./logs:/var/www/html/logs + - ./composer.json:/var/www/html/composer.json + - ./.env:/var/www/html/.env + environment: + - APP_ENV=${APP_ENV:-production} + - APP_DEBUG=${APP_DEBUG:-false} + - CASSANDRA_HOST=cassandra + - CASSANDRA_PORT=9042 depends_on: cassandra: condition: service_healthy - restart: always + networks: + - app-network - # Nginx Web Server Container nginx: - image: nginx:stable-alpine - container_name: nginx-web + image: nginx:alpine + container_name: nginx ports: - # Map host port 10010 to container port 80 - - '10010:80' + - "8080:80" volumes: - # Mount the source code (same path as PHP-FPM) - - ./src:/var/www/html - # Override Nginx default configuration + - ./src:/var/www/html/src - ./docker/nginx/default.conf:/etc/nginx/conf.d/default.conf depends_on: - - app - restart: always + - php + networks: + - app-network + +networks: + app-network: + driver: bridge diff --git a/docker/cassandra/init.cql b/docker/cassandra/init.cql new file mode 100644 index 0000000..5eb230e --- /dev/null +++ b/docker/cassandra/init.cql @@ -0,0 +1,24 @@ +-- Create keyspace +CREATE KEYSPACE IF NOT EXISTS event_store +WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + +USE event_store; + +-- Create events table +CREATE TABLE IF NOT EXISTS events ( + aggregate_id text, + version int, + event_type text, + payload text, + created_at timestamp, + PRIMARY KEY (aggregate_id, version) +) WITH CLUSTERING ORDER BY (version ASC) + AND comment = 'Event store for domain events' + AND compaction = {'class': 'LeveledCompactionStrategy'} + AND gc_grace_seconds = 864000; + +-- Create index on event_type for querying +CREATE INDEX IF NOT EXISTS events_event_type_idx ON events (event_type); + +-- Create index on created_at for time-based queries +CREATE INDEX IF NOT EXISTS events_created_at_idx ON events (created_at); diff --git a/docker/nginx/default.conf b/docker/nginx/default.conf index 88aa141..08b94f2 100644 --- a/docker/nginx/default.conf +++ b/docker/nginx/default.conf @@ -26,7 +26,7 @@ server { # 3. Block to pass PHP scripts to PHP-FPM location ~ \.php$ { # This remains the same to execute any file ending in .php - fastcgi_pass app:9000; # 'app' is the name of your PHP-FPM service + fastcgi_pass php:9000; # 'php' is the name of your PHP-FPM service fastcgi_index index.php; include fastcgi_params; fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name; diff --git a/docker/php/Dockerfile b/docker/php/Dockerfile new file mode 100644 index 0000000..e69de29 diff --git a/scripts/init-cassandra.sh b/scripts/init-cassandra.sh new file mode 100644 index 0000000..511e31f --- /dev/null +++ b/scripts/init-cassandra.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +echo "Waiting for Cassandra to be ready..." +until docker exec cassandra cqlsh -e "describe cluster" > /dev/null 2>&1; do + echo "Cassandra is unavailable - sleeping" + sleep 5 +done + +echo "Cassandra is up - initializing schema" +docker exec -i cassandra cqlsh < docker/cassandra/init.cql + +echo "Cassandra schema initialized successfully" diff --git a/src/Application/Handlers/RegisterCarrierHandler.php b/src/Application/Handlers/RegisterCarrierHandler.php index 15a9e09..71b9fa6 100644 --- a/src/Application/Handlers/RegisterCarrierHandler.php +++ b/src/Application/Handlers/RegisterCarrierHandler.php @@ -5,24 +5,62 @@ namespace DistributingCarriers\Application\Handlers; use DistributingCarriers\Application\Commands\RegisterCarrierCommand; use DistributingCarriers\Domain\Aggregates\Carrier; use DistributingCarriers\Infrastructure\EventSourcing\EventStore; +use DistributingCarriers\Infrastructure\Logging\Logger; +use Ramsey\Uuid\Uuid; class RegisterCarrierHandler { - private $eventStore; + private EventStore $eventStore; public function __construct(EventStore $eventStore) { $this->eventStore = $eventStore; } - public function __invoke(RegisterCarrierCommand $command): void + public function __invoke(RegisterCarrierCommand $command): array { - $carrierId = uniqid('carrier_'); - $carrier = Carrier::register($carrierId, $command->name, $command->email); + $logger = Logger::getInstance(); + + try { + // Generate proper UUID for carrier + $carrierId = Uuid::uuid4()->toString(); + + $logger->debug('Creating carrier aggregate', [ + 'carrier_id' => $carrierId, + 'name' => $command->name + ]); - $this->eventStore->save($carrier->getId(), $carrier->getUncommittedChanges(), $carrier->getVersion()); - $carrier->markChangesAsCommitted(); + // Create carrier aggregate + $carrier = Carrier::register($carrierId, $command->name, $command->email); - echo "Carrier registered with ID: $carrierId\n"; + // Persist events + $this->eventStore->save( + $carrier->getId(), + $carrier->getUncommittedChanges(), + $carrier->getVersion() + ); + + $carrier->markChangesAsCommitted(); + + $logger->info('Carrier registered successfully', [ + 'carrier_id' => $carrierId, + 'name' => $command->name, + 'email' => $command->email + ]); + + return [ + 'carrier_id' => $carrierId, + 'name' => $command->name, + 'email' => $command->email + ]; + + } catch (\Exception $e) { + $logger->error('Failed to register carrier', [ + 'error' => $e->getMessage(), + 'name' => $command->name, + 'email' => $command->email + ]); + throw $e; + } } } diff --git a/src/Application/Http/GetCarrierHandler.php b/src/Application/Http/GetCarrierHandler.php new file mode 100644 index 0000000..d9bc3cf --- /dev/null +++ b/src/Application/Http/GetCarrierHandler.php @@ -0,0 +1,63 @@ +eventStore = $eventStore; + $this->carrierId = $carrierId; + } + + public function handle(): void + { + $logger = Logger::getInstance(); + + try { + $logger->debug('Fetching carrier', ['carrier_id' => $this->carrierId]); + + $events = $this->eventStore->getEventsForAggregate($this->carrierId); + + if (empty($events)) { + $logger->info('Carrier not found', ['carrier_id' => $this->carrierId]); + JsonResponse::error('Carrier not found', 404); + return; + } + + $carrier = new Carrier(); + $carrier->loadFromHistory($events); + + $reflection = new \ReflectionClass($carrier); + $nameProperty = $reflection->getProperty('name'); + $nameProperty->setAccessible(true); + $emailProperty = $reflection->getProperty('email'); + $emailProperty->setAccessible(true); + + JsonResponse::success([ + 'carrier_id' => $carrier->getId(), + 'name' => $nameProperty->getValue($carrier), + 'email' => $emailProperty->getValue($carrier), + 'version' => $carrier->getVersion() + ]); + + } catch (\Exception $e) { + $logger->error('Failed to fetch carrier', [ + 'carrier_id' => $this->carrierId, + 'error' => $e->getMessage() + ]); + JsonResponse::error('Failed to fetch carrier', 500); + } + } +} diff --git a/src/Application/Http/HealthCheckHandler.php b/src/Application/Http/HealthCheckHandler.php new file mode 100644 index 0000000..df755f9 --- /dev/null +++ b/src/Application/Http/HealthCheckHandler.php @@ -0,0 +1,45 @@ +session = $session; + } + + public function handle(): void + { + $health = [ + 'status' => 'healthy', + 'timestamp' => date('c'), + 'checks' => [] + ]; + + // Check Cassandra connection + if ($this->session !== null) { + try { + $this->session->execute(new \Cassandra\SimpleStatement('SELECT now() FROM system.local')); + $health['checks']['cassandra'] = 'healthy'; + } catch (\Exception $e) { + $health['checks']['cassandra'] = 'unhealthy'; + $health['status'] = 'degraded'; + } + } + + // Check logs directory + $logsDir = __DIR__ . '/../../../logs'; + $health['checks']['logs'] = is_writable($logsDir) ? 'healthy' : 'unhealthy'; + + $statusCode = $health['status'] === 'healthy' ? 200 : 503; + JsonResponse::success($health, $statusCode); + } +} diff --git a/src/Application/Http/RegisterCarrierRequestHandler.php b/src/Application/Http/RegisterCarrierRequestHandler.php new file mode 100644 index 0000000..a6f2f76 --- /dev/null +++ b/src/Application/Http/RegisterCarrierRequestHandler.php @@ -0,0 +1,103 @@ +commandBus = $commandBus; + } + + public function handle(): void + { + $logger = Logger::getInstance(); + + try { + // Read and validate request size + $maxSize = (int)(getenv('MAX_REQUEST_SIZE') ?: 1048576); // 1MB default + $contentLength = (int)($_SERVER['CONTENT_LENGTH'] ?? 0); + + if ($contentLength > $maxSize) { + $logger->warning('Request payload too large', ['size' => $contentLength]); + JsonResponse::error('Request payload too large', 413); + return; + } + + // Read raw input + $rawInput = file_get_contents('php://input'); + + if ($rawInput === false || $rawInput === '') { + $logger->warning('Empty request body received'); + JsonResponse::error('Request body is required', 400); + return; + } + + // Parse JSON + $input = json_decode($rawInput, true); + + if (json_last_error() !== JSON_ERROR_NONE) { + $logger->warning('Invalid JSON received', ['error' => json_last_error_msg()]); + JsonResponse::error('Invalid JSON: ' . json_last_error_msg(), 400); + return; + } + + // Validate input + $validator = new Validator(); + $validator + ->validateRequired($input, ['name', 'email']) + ->validateEmail($input, 'email') + ->validateMinLength($input, 'name', 2) + ->validateMaxLength($input, 'name', 255) + ->validateMaxLength($input, 'email', 255); + + if ($validator->fails()) { + $logger->info('Validation failed', ['errors' => $validator->getErrors()]); + JsonResponse::error('Validation failed', 422, $validator->getErrors()); + return; + } + + // Sanitize input + $name = trim($input['name']); + $email = strtolower(trim($input['email'])); + + $logger->info('Registering carrier', ['name' => $name, 'email' => $email]); + + // Dispatch command + $command = new RegisterCarrierCommand($name, $email); + $result = $this->commandBus->dispatch($command); + + $logger->info('Carrier registered successfully', ['carrier_id' => $result['carrier_id'] ?? null]); + + JsonResponse::success([ + 'message' => 'Carrier registered successfully', + 'carrier_id' => $result['carrier_id'] ?? null + ], 201); + + } catch (ValidationException $e) { + $logger->warning('Validation exception', ['errors' => $e->getErrors()]); + JsonResponse::error($e->getMessage(), 422, $e->getErrors()); + } catch (\Exception $e) { + $logger->error('Unexpected error during carrier registration', [ + 'error' => $e->getMessage(), + 'trace' => $e->getTraceAsString() + ]); + + if (getenv('APP_DEBUG') === 'true') { + JsonResponse::error('Internal server error: ' . $e->getMessage(), 500); + } else { + JsonResponse::error('An unexpected error occurred', 500); + } + } + } +} diff --git a/src/Domain/ValueObjects/CarrierName.php b/src/Domain/ValueObjects/CarrierName.php new file mode 100644 index 0000000..1b97c4b --- /dev/null +++ b/src/Domain/ValueObjects/CarrierName.php @@ -0,0 +1,40 @@ + 255) { + throw new DomainException("Carrier name must not exceed 255 characters"); + } + + $this->value = $name; + } + + public function getValue(): string + { + return $this->value; + } + + public function __toString(): string + { + return $this->value; + } + + public function equals(CarrierName $other): bool + { + return $this->value === $other->value; + } +} diff --git a/src/Domain/ValueObjects/Email.php b/src/Domain/ValueObjects/Email.php new file mode 100644 index 0000000..7098a68 --- /dev/null +++ b/src/Domain/ValueObjects/Email.php @@ -0,0 +1,36 @@ +value = $email; + } + + public function getValue(): string + { + return $this->value; + } + + public function __toString(): string + { + return $this->value; + } + + public function equals(Email $other): bool + { + return $this->value === $other->value; + } +} diff --git a/src/Infrastructure/EventSourcing/CassandraEventStore.php b/src/Infrastructure/EventSourcing/CassandraEventStore.php index ca54981..9f9439c 100644 --- a/src/Infrastructure/EventSourcing/CassandraEventStore.php +++ b/src/Infrastructure/EventSourcing/CassandraEventStore.php @@ -3,69 +3,167 @@ namespace DistributingCarriers\Infrastructure\EventSourcing; use Cassandra; +use DistributingCarriers\Infrastructure\Exceptions\ConcurrencyException; +use DistributingCarriers\Infrastructure\Logging\Logger; class CassandraEventStore implements EventStore { - private $session; + private Cassandra\Session $session; + private int $maxRetries = 3; - public function __construct($session) + public function __construct(Cassandra\Session $session) { $this->session = $session; } public function save(string $aggregateId, array $events, int $expectedVersion): void { - // In a real implementation, we would check the version for concurrency control. - // For this example, we'll just append. + $logger = Logger::getInstance(); + + if (empty($events)) { + $logger->warning('Attempted to save empty events array', ['aggregate_id' => $aggregateId]); + return; + } - $statement = $this->session->prepare( - "INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)" - ); + try { + // Check current version for optimistic concurrency control + $currentVersion = $this->getCurrentVersion($aggregateId); + + if ($currentVersion !== $expectedVersion) { + throw new ConcurrencyException( + "Concurrency conflict for aggregate {$aggregateId}. " . + "Expected version {$expectedVersion}, but current version is {$currentVersion}" + ); + } - foreach ($events as $event) { - $version = ++$expectedVersion; - $payload = json_encode($event); - $eventType = get_class($event); - $createdAt = new Cassandra\Timestamp(); + $statement = $this->session->prepare( + "INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)" + ); - $this->session->execute($statement, [ - 'arguments' => [ + $batch = new Cassandra\BatchStatement(Cassandra::BATCH_LOGGED); + + foreach ($events as $event) { + $version = ++$expectedVersion; + $eventType = \get_class($event); + $payload = json_encode($event, JSON_THROW_ON_ERROR); + $createdAt = new Cassandra\Timestamp(); + + $batch->add($statement, [ $aggregateId, $version, $eventType, $payload, $createdAt - ] + ]); + + $logger->debug('Event added to batch', [ + 'aggregate_id' => $aggregateId, + 'version' => $version, + 'event_type' => $eventType + ]); + } + + // Execute batch with retry logic + $this->executeWithRetry($batch); + + $logger->info('Events saved successfully', [ + 'aggregate_id' => $aggregateId, + 'event_count' => count($events), + 'final_version' => $expectedVersion ]); + + } catch (ConcurrencyException $e) { + $logger->warning('Concurrency conflict detected', [ + 'aggregate_id' => $aggregateId, + 'expected_version' => $expectedVersion + ]); + throw $e; + } catch (\Exception $e) { + $logger->error('Failed to save events', [ + 'aggregate_id' => $aggregateId, + 'error' => $e->getMessage() + ]); + throw new \RuntimeException("Failed to save events: " . $e->getMessage(), 0, $e); } } public function getEventsForAggregate(string $aggregateId): array + { + $logger = Logger::getInstance(); + + try { + $statement = $this->session->prepare( + "SELECT event_type, payload, version, created_at FROM events WHERE aggregate_id = ? ORDER BY version ASC" + ); + + $result = $this->session->execute($statement, ['arguments' => [$aggregateId]]); + + $events = []; + foreach ($result as $row) { + $eventType = $row['event_type']; + $payload = json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR); + + if (class_exists($eventType)) { + $events[] = new $eventType($payload); + } else { + $logger->warning('Event class not found', [ + 'event_type' => $eventType, + 'aggregate_id' => $aggregateId + ]); + } + } + + $logger->debug('Events loaded for aggregate', [ + 'aggregate_id' => $aggregateId, + 'event_count' => count($events) + ]); + + return $events; + + } catch (\Exception $e) { + $logger->error('Failed to load events', [ + 'aggregate_id' => $aggregateId, + 'error' => $e->getMessage() + ]); + throw new \RuntimeException("Failed to load events: " . $e->getMessage(), 0, $e); + } + } + + private function getCurrentVersion(string $aggregateId): int { $statement = $this->session->prepare( - "SELECT event_type, payload FROM events WHERE aggregate_id = ?" + "SELECT MAX(version) as max_version FROM events WHERE aggregate_id = ?" ); $result = $this->session->execute($statement, ['arguments' => [$aggregateId]]); + $row = $result->first(); - $events = []; - foreach ($result as $row) { - $eventType = $row['event_type']; - $payload = json_decode($row['payload'], true); + return $row['max_version'] ?? 0; + } - // Assuming we can reconstruct the event object from payload - // This is a simplified version. - if (class_exists($eventType)) { - // In a real app, you might use a serializer/deserializer - // Here we assume the event has a static fromArray or similar, or just public properties - // For simplicity, let's assume we just return the data or a generic object if class doesn't handle it - // But for the plan, let's try to instantiate. - // We'll leave it as an array or basic object for now if complex reconstruction is needed. - // Let's assume the event class has a constructor that takes the payload. - $events[] = new $eventType($payload); + private function executeWithRetry(Cassandra\BatchStatement $batch): void + { + $attempt = 0; + $lastException = null; + + while ($attempt < $this->maxRetries) { + try { + $this->session->execute($batch); + return; + } catch (Cassandra\Exception\WriteTimeoutException $e) { + $lastException = $e; + $attempt++; + + if ($attempt < $this->maxRetries) { + usleep(100000 * $attempt); // Exponential backoff: 100ms, 200ms, 300ms + } } } - return $events; + throw new \RuntimeException( + "Failed to execute batch after {$this->maxRetries} attempts", + 0, + $lastException + ); } } diff --git a/src/Infrastructure/Exceptions/ConcurrencyException.php b/src/Infrastructure/Exceptions/ConcurrencyException.php new file mode 100644 index 0000000..309d8f0 --- /dev/null +++ b/src/Infrastructure/Exceptions/ConcurrencyException.php @@ -0,0 +1,7 @@ +errors = $errors; + } + + public function getErrors(): array + { + return $this->errors; + } +} diff --git a/src/Infrastructure/Http/JsonResponse.php b/src/Infrastructure/Http/JsonResponse.php new file mode 100644 index 0000000..eade0dc --- /dev/null +++ b/src/Infrastructure/Http/JsonResponse.php @@ -0,0 +1,35 @@ + true, 'data' => $data], $statusCode); + } + + public static function error(string $message, int $statusCode = 400, ?array $details = null): void + { + $response = [ + 'success' => false, + 'error' => [ + 'message' => $message, + 'code' => $statusCode + ] + ]; + + if ($details !== null) { + $response['error']['details'] = $details; + } + + self::send($response, $statusCode); + } + + private static function send(array $data, int $statusCode): void + { + http_response_code($statusCode); + header('Content-Type: application/json; charset=utf-8'); + echo json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES); + } +} diff --git a/src/Infrastructure/Logging/Logger.php b/src/Infrastructure/Logging/Logger.php new file mode 100644 index 0000000..ec525e4 --- /dev/null +++ b/src/Infrastructure/Logging/Logger.php @@ -0,0 +1,38 @@ + MonologLogger::DEBUG, + 'info' => MonologLogger::INFO, + 'warning' => MonologLogger::WARNING, + 'error' => MonologLogger::ERROR, + default => MonologLogger::INFO, + }; + + $handler = new RotatingFileHandler(__DIR__ . '/../../../logs/app.log', 30, $logLevel); + $handler->setFormatter(new JsonFormatter()); + self::$instance->pushHandler($handler); + + if (getenv('APP_ENV') === 'development') { + self::$instance->pushHandler(new StreamHandler('php://stdout', MonologLogger::DEBUG)); + } + } + + return self::$instance; + } +} diff --git a/src/Infrastructure/Messaging/CommandBus.php b/src/Infrastructure/Messaging/CommandBus.php index 82c1de6..694069b 100644 --- a/src/Infrastructure/Messaging/CommandBus.php +++ b/src/Infrastructure/Messaging/CommandBus.php @@ -2,23 +2,42 @@ namespace DistributingCarriers\Infrastructure\Messaging; +use DistributingCarriers\Infrastructure\Logging\Logger; + class CommandBus { - private $handlers = []; + private array $handlers = []; public function register(string $commandClass, callable $handler): void { $this->handlers[$commandClass] = $handler; } - public function dispatch(object $command): void + public function dispatch(object $command): mixed { $commandClass = \get_class($command); + $logger = Logger::getInstance(); + if (!isset($this->handlers[$commandClass])) { + $logger->error('No handler registered for command', ['command' => $commandClass]); throw new \Exception("No handler registered for command: $commandClass"); } - $handler = $this->handlers[$commandClass]; - $handler($command); + $logger->debug('Dispatching command', ['command' => $commandClass]); + + try { + $handler = $this->handlers[$commandClass]; + $result = $handler($command); + + $logger->debug('Command dispatched successfully', ['command' => $commandClass]); + + return $result; + } catch (\Exception $e) { + $logger->error('Command dispatch failed', [ + 'command' => $commandClass, + 'error' => $e->getMessage() + ]); + throw $e; + } } } diff --git a/src/Infrastructure/Middleware/CorsMiddleware.php b/src/Infrastructure/Middleware/CorsMiddleware.php new file mode 100644 index 0000000..c744e45 --- /dev/null +++ b/src/Infrastructure/Middleware/CorsMiddleware.php @@ -0,0 +1,28 @@ +handle(); + } +} diff --git a/src/Infrastructure/Middleware/RateLimitMiddleware.php b/src/Infrastructure/Middleware/RateLimitMiddleware.php new file mode 100644 index 0000000..0a21022 --- /dev/null +++ b/src/Infrastructure/Middleware/RateLimitMiddleware.php @@ -0,0 +1,71 @@ +maxRequests = $maxRequests; + $this->windowSeconds = $windowSeconds; + $this->storageDir = sys_get_temp_dir() . '/rate_limits'; + + if (!is_dir($this->storageDir)) { + mkdir($this->storageDir, 0777, true); + } + } + + public function process(IRequestHandler $handler): void + { + $clientIp = $_SERVER['REMOTE_ADDR'] ?? 'unknown'; + $key = md5($clientIp); + $file = $this->storageDir . '/' . $key; + + $now = time(); + $requests = $this->getRequests($file, $now); + + if (\count($requests) >= $this->maxRequests) { + $logger = Logger::getInstance(); + $logger->warning('Rate limit exceeded', ['ip' => $clientIp]); + + header('X-RateLimit-Limit: ' . $this->maxRequests); + header('X-RateLimit-Remaining: 0'); + header('X-RateLimit-Reset: ' . ($now + $this->windowSeconds)); + + JsonResponse::error('Rate limit exceeded', 429); + exit; + } + + $requests[] = $now; + file_put_contents($file, json_encode($requests)); + + header('X-RateLimit-Limit: ' . $this->maxRequests); + header('X-RateLimit-Remaining: ' . ($this->maxRequests - \count($requests))); + header('X-RateLimit-Reset: ' . ($now + $this->windowSeconds)); + + $handler->handle(); + } + + private function getRequests(string $file, int $now): array + { + if (!file_exists($file)) { + return []; + } + + $content = file_get_contents($file); + $requests = json_decode($content, true) ?: []; + + return array_filter($requests, fn($timestamp) => $timestamp > ($now - $this->windowSeconds)); + } +} diff --git a/src/Infrastructure/Middleware/RequestIdMiddleware.php b/src/Infrastructure/Middleware/RequestIdMiddleware.php new file mode 100644 index 0000000..be3dee6 --- /dev/null +++ b/src/Infrastructure/Middleware/RequestIdMiddleware.php @@ -0,0 +1,23 @@ +toString(); + + header("X-Request-ID: $requestId"); + + $_SERVER['REQUEST_ID'] = $requestId; + + $handler->handle(); + } +} diff --git a/src/Infrastructure/Validation/Validator.php b/src/Infrastructure/Validation/Validator.php new file mode 100644 index 0000000..46ed9f4 --- /dev/null +++ b/src/Infrastructure/Validation/Validator.php @@ -0,0 +1,52 @@ +errors[$field] = "The {$field} field is required"; + } + } + return $this; + } + + public function validateEmail(array $data, string $field): self + { + if (isset($data[$field]) && !filter_var($data[$field], FILTER_VALIDATE_EMAIL)) { + $this->errors[$field] = "The {$field} must be a valid email address"; + } + return $this; + } + + public function validateMaxLength(array $data, string $field, int $maxLength): self + { + if (isset($data[$field]) && strlen($data[$field]) > $maxLength) { + $this->errors[$field] = "The {$field} must not exceed {$maxLength} characters"; + } + return $this; + } + + public function validateMinLength(array $data, string $field, int $minLength): self + { + if (isset($data[$field]) && strlen($data[$field]) < $minLength) { + $this->errors[$field] = "The {$field} must be at least {$minLength} characters"; + } + return $this; + } + + public function fails(): bool + { + return !empty($this->errors); + } + + public function getErrors(): array + { + return $this->errors; + } +} diff --git a/src/index.php b/src/index.php index 7a18061..168d460 100644 --- a/src/index.php +++ b/src/index.php @@ -1,5 +1,7 @@ withContactPoints(getenv('CASSANDRA_HOST') ?: 'cassandra') - ->withPort((int)(getenv('CASSANDRA_PORT') ?: 9042)) - ->build(); +// Load environment variables +if (file_exists(__DIR__ . '/../.env')) { + $dotenv = Dotenv::createImmutable(__DIR__ . '/..'); + $dotenv->load(); +} -$session = $cluster->connect('event_store'); +// Set error handling +error_reporting(E_ALL); +ini_set('display_errors', getenv('APP_DEBUG') === 'true' ? '1' : '0'); +ini_set('log_errors', '1'); +ini_set('error_log', __DIR__ . '/../logs/php_errors.log'); -// Initialize Infrastructure -$eventStore = new CassandraEventStore($session); -$commandBus = new CommandBus(); +// Set default timezone +date_default_timezone_set('UTC'); -// Register Handlers -$commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore)); +// Global exception handler +set_exception_handler(function (Throwable $e) { + $logger = Logger::getInstance(); + $logger->error('Uncaught exception', [ + 'message' => $e->getMessage(), + 'file' => $e->getFile(), + 'line' => $e->getLine(), + 'trace' => $e->getTraceAsString() + ]); -// Router -$router = new Router(); - -$router->add('POST', '/register-carrier', function () use ($commandBus) { - $input = json_decode(file_get_contents('php://input'), true); - - if (!isset($input['name']) || !isset($input['email'])) { - http_response_code(400); - echo json_encode(['error' => 'Missing required fields: name, email']); - return; + if (getenv('APP_DEBUG') === 'true') { + JsonResponse::error('Internal server error: ' . $e->getMessage(), 500); + } else { + JsonResponse::error('An unexpected error occurred', 500); } - - $command = new RegisterCarrierCommand($input['name'], $input['email']); - $commandBus->dispatch($command); - - http_response_code(201); - echo json_encode(['message' => 'Carrier registered successfully']); }); -$method = $_SERVER['REQUEST_METHOD'] ?? 'GET'; -$uri = parse_url($_SERVER['REQUEST_URI'] ?? '/', PHP_URL_PATH); +// Global error handler +set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) { + if (!(error_reporting() & $errno)) { + return false; + } -$router->dispatch($method, $uri); + $logger = Logger::getInstance(); + $logger->error('PHP Error', [ + 'errno' => $errno, + 'errstr' => $errstr, + 'errfile' => $errfile, + 'errline' => $errline + ]); + + throw new ErrorException($errstr, 0, $errno, $errfile, $errline); +}); + +try { + $logger = Logger::getInstance(); + $logger->info('Application starting', [ + 'method' => $_SERVER['REQUEST_METHOD'] ?? 'UNKNOWN', + 'uri' => $_SERVER['REQUEST_URI'] ?? '/', + 'ip' => $_SERVER['REMOTE_ADDR'] ?? 'unknown' + ]); + + // Initialize Cassandra Connection with retry logic + $maxRetries = 3; + $retryDelay = 1000000; // 1 second + $cluster = null; + $session = null; + + for ($attempt = 1; $attempt <= $maxRetries; $attempt++) { + try { + $clusterBuilder = Cassandra::cluster() + ->withContactPoints(getenv('CASSANDRA_HOST') ?: 'cassandra') + ->withPort((int)(getenv('CASSANDRA_PORT') ?: 9042)) + ->withConnectTimeout(10) + ->withRequestTimeout(10); + + // Add credentials if provided + $username = getenv('CASSANDRA_USERNAME'); + $password = getenv('CASSANDRA_PASSWORD'); + if ($username && $password) { + $clusterBuilder->withCredentials($username, $password); + } + + $cluster = $clusterBuilder->build(); + $keyspace = getenv('CASSANDRA_KEYSPACE') ?: 'event_store'; + $session = $cluster->connect($keyspace); + + $logger->info('Connected to Cassandra', [ + 'keyspace' => $keyspace, + 'attempt' => $attempt + ]); + break; + + } catch (Exception $e) { + $logger->warning('Failed to connect to Cassandra', [ + 'attempt' => $attempt, + 'max_retries' => $maxRetries, + 'error' => $e->getMessage() + ]); + + if ($attempt === $maxRetries) { + throw new RuntimeException('Could not connect to Cassandra after ' . $maxRetries . ' attempts', 0, $e); + } + + usleep($retryDelay * $attempt); + } + } + + // Initialize Infrastructure + $eventStore = new CassandraEventStore($session); + $commandBus = new CommandBus(); + + // Register Handlers + $commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore)); + + // Router + $router = new Router(); + + // Add global middlewares + $router->addGlobalMiddleware(new RequestIdMiddleware()); + $router->addGlobalMiddleware(new CorsMiddleware()); + + // Add routes + $router->add('GET', '/health', new HealthCheckHandler($session)); + $router->add('POST', '/register-carrier', new RegisterCarrierRequestHandler($commandBus), [ + new RateLimitMiddleware(100, 60) + ]); + + // Add GET carrier route with dynamic ID + $method = $_SERVER['REQUEST_METHOD'] ?? 'GET'; + $uri = parse_url($_SERVER['REQUEST_URI'] ?? '/', PHP_URL_PATH); + + // Handle GET /carriers/{id} + if ($method === 'GET' && preg_match('#^/carriers/([a-f0-9\-]{36})$#', $uri, $matches)) { + $carrierId = $matches[1]; + $handler = new GetCarrierHandler($eventStore, $carrierId); + $router->add('GET', $uri, $handler); + } + + $logger->debug('Dispatching request', [ + 'method' => $method, + 'uri' => $uri, + 'request_id' => $_SERVER['REQUEST_ID'] ?? 'unknown' + ]); + + $router->dispatch($method, $uri); + + $logger->info('Request completed successfully'); + +} catch (Throwable $e) { + $logger = Logger::getInstance(); + $logger->critical('Application failed to start', [ + 'error' => $e->getMessage(), + 'trace' => $e->getTraceAsString() + ]); + + if (getenv('APP_DEBUG') === 'true') { + JsonResponse::error('Application initialization failed: ' . $e->getMessage(), 500); + } else { + JsonResponse::error('Service temporarily unavailable', 503); + } +}