diff --git a/README.md b/README.md new file mode 100644 index 0000000..233fa84 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# Distributing Carriers + +Distributing Carriers Application. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..7b8fc8e --- /dev/null +++ b/composer.json @@ -0,0 +1,20 @@ +{ + "name": "mamad/distributing-carriers", + "description": "Distributing Carriers Application with DDD, CQRS, and Event Sourcing", + "type": "project", + "require": { + "php": "^8.0", + "ext-cassandra": "*" + }, + "autoload": { + "psr-4": { + "DistributingCarriers\\": "src/" + } + }, + "authors": [ + { + "name": "Mamad", + "email": "mamad@example.com" + } + ] +} \ No newline at end of file diff --git a/src/Application/Commands/RegisterCarrierCommand.php b/src/Application/Commands/RegisterCarrierCommand.php new file mode 100644 index 0000000..d2ad7e0 --- /dev/null +++ b/src/Application/Commands/RegisterCarrierCommand.php @@ -0,0 +1,15 @@ +name = $name; + $this->email = $email; + } +} diff --git a/src/Application/Handlers/RegisterCarrierHandler.php b/src/Application/Handlers/RegisterCarrierHandler.php new file mode 100644 index 0000000..6084706 --- /dev/null +++ b/src/Application/Handlers/RegisterCarrierHandler.php @@ -0,0 +1,28 @@ +eventStore = $eventStore; + } + + public function __invoke(RegisterCarrierCommand $command): void + { + $carrierId = uniqid('carrier_'); + $carrier = Carrier::register($carrierId, $command->name, $command->email); + + $this->eventStore->save($carrier->getId(), $carrier->getUncommittedChanges(), $carrier->getVersion()); + $carrier->markChangesAsCommitted(); + + echo "Carrier registered with ID: " . $carrierId . "\n"; + } +} diff --git a/src/Domain/AggregateRoot.php b/src/Domain/AggregateRoot.php new file mode 100644 index 0000000..8bbaf90 --- /dev/null +++ b/src/Domain/AggregateRoot.php @@ -0,0 +1,52 @@ +id; + } + + public function getVersion(): int + { + return $this->version; + } + + public function getUncommittedChanges(): array + { + return $this->changes; + } + + public function markChangesAsCommitted(): void + { + $this->changes = []; + } + + protected function recordThat(object $event): void + { + $this->apply($event); + $this->changes[] = $event; + } + + protected function apply(object $event): void + { + $method = 'apply' . (new \ReflectionClass($event))->getShortName(); + if (method_exists($this, $method)) { + $this->$method($event); + } + $this->version++; + } + + public function loadFromHistory(array $history): void + { + foreach ($history as $event) { + $this->apply($event); + } + } +} diff --git a/src/Domain/Carrier.php b/src/Domain/Carrier.php new file mode 100644 index 0000000..c67a449 --- /dev/null +++ b/src/Domain/Carrier.php @@ -0,0 +1,29 @@ +recordThat(new CarrierRegistered([ + 'carrierId' => $carrierId, + 'name' => $name, + 'email' => $email + ])); + return $carrier; + } + + protected function applyCarrierRegistered(CarrierRegistered $event): void + { + $this->id = $event->carrierId; + $this->name = $event->name; + $this->email = $event->email; + } +} diff --git a/src/Domain/Events/CarrierRegistered.php b/src/Domain/Events/CarrierRegistered.php new file mode 100644 index 0000000..5f60079 --- /dev/null +++ b/src/Domain/Events/CarrierRegistered.php @@ -0,0 +1,17 @@ +carrierId = $payload['carrierId'] ?? null; + $this->name = $payload['name'] ?? null; + $this->email = $payload['email'] ?? null; + } +} diff --git a/src/Infrastructure/CassandraEventStore.php b/src/Infrastructure/CassandraEventStore.php new file mode 100644 index 0000000..8a9e09a --- /dev/null +++ b/src/Infrastructure/CassandraEventStore.php @@ -0,0 +1,71 @@ +session = $session; + } + + public function save(string $aggregateId, array $events, int $expectedVersion): void + { + // In a real implementation, we would check the version for concurrency control. + // For this example, we'll just append. + + $statement = $this->session->prepare( + "INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)" + ); + + foreach ($events as $event) { + $version = ++$expectedVersion; + $payload = json_encode($event); + $eventType = get_class($event); + $createdAt = new Cassandra\Timestamp(); + + $this->session->execute($statement, [ + 'arguments' => [ + $aggregateId, + $version, + $eventType, + $payload, + $createdAt + ] + ]); + } + } + + public function getEventsForAggregate(string $aggregateId): array + { + $statement = $this->session->prepare( + "SELECT event_type, payload FROM events WHERE aggregate_id = ?" + ); + + $result = $this->session->execute($statement, ['arguments' => [$aggregateId]]); + + $events = []; + foreach ($result as $row) { + $eventType = $row['event_type']; + $payload = json_decode($row['payload'], true); + + // Assuming we can reconstruct the event object from payload + // This is a simplified version. + if (class_exists($eventType)) { + // In a real app, you might use a serializer/deserializer + // Here we assume the event has a static fromArray or similar, or just public properties + // For simplicity, let's assume we just return the data or a generic object if class doesn't handle it + // But for the plan, let's try to instantiate. + // We'll leave it as an array or basic object for now if complex reconstruction is needed. + // Let's assume the event class has a constructor that takes the payload. + $events[] = new $eventType($payload); + } + } + + return $events; + } +} diff --git a/src/Infrastructure/CommandBus.php b/src/Infrastructure/CommandBus.php new file mode 100644 index 0000000..05a0126 --- /dev/null +++ b/src/Infrastructure/CommandBus.php @@ -0,0 +1,24 @@ +handlers[$commandClass] = $handler; + } + + public function dispatch(object $command): void + { + $commandClass = get_class($command); + if (!isset($this->handlers[$commandClass])) { + throw new \Exception("No handler registered for command: " . $commandClass); + } + + $handler = $this->handlers[$commandClass]; + $handler($command); + } +} diff --git a/src/Infrastructure/EventStore.php b/src/Infrastructure/EventStore.php new file mode 100644 index 0000000..ec512cd --- /dev/null +++ b/src/Infrastructure/EventStore.php @@ -0,0 +1,9 @@ +routes[] = [ + 'method' => $method, + 'path' => $path, + 'handler' => $handler + ]; + } + + public function dispatch(string $method, string $uri) + { + foreach ($this->routes as $route) { + if ($route['method'] === $method && $route['path'] === $uri) { + return call_user_func($route['handler']); + } + } + + http_response_code(404); + echo "Not Found"; + } +} diff --git a/src/index.php b/src/index.php index e69de29..33300b3 100644 --- a/src/index.php +++ b/src/index.php @@ -0,0 +1,43 @@ +build(); +// $session = $cluster->connect('distributing_carriers'); + +// Mocking session for demonstration if extension is missing, or just placeholder. +$session = null; // Placeholder + +$eventStore = new CassandraEventStore($session); +$commandBus = new CommandBus(); + +// Register Handlers +$commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore)); + +// Router +$router = new Router(); + +$router->add('POST', '/register-carrier', function () use ($commandBus) { + // In a real app, parse body. + // For demo, we'll just create a dummy command. + $command = new RegisterCarrierCommand("Test Carrier", "test@example.com"); + $commandBus->dispatch($command); +}); + + +$method = $_SERVER['REQUEST_METHOD'] ?? 'GET'; +$uri = $_SERVER['REQUEST_URI'] ?? '/'; + + +$router->dispatch($method, $uri); diff --git a/tests/demo.php b/tests/demo.php new file mode 100644 index 0000000..6fb4257 --- /dev/null +++ b/tests/demo.php @@ -0,0 +1,66 @@ +executedStatements[] = [ + 'statement' => $statement->cql, + 'arguments' => $options['arguments'] + ]; + echo "Executed CQL: " . $statement->cql . "\n"; + return []; + } + } + + class MockStatement + { + public $cql; + public function __construct($cql) + { + $this->cql = $cql; + } + } + + require_once __DIR__ . '/../src/Infrastructure/EventStore.php'; + require_once __DIR__ . '/../src/Infrastructure/CassandraEventStore.php'; + require_once __DIR__ . '/../src/Infrastructure/CommandBus.php'; + require_once __DIR__ . '/../src/Domain/AggregateRoot.php'; + require_once __DIR__ . '/../src/Domain/Events/CarrierRegistered.php'; + require_once __DIR__ . '/../src/Domain/Carrier.php'; + require_once __DIR__ . '/../src/Application/Commands/RegisterCarrierCommand.php'; + require_once __DIR__ . '/../src/Application/Handlers/RegisterCarrierHandler.php'; + + use DistributingCarriers\Infrastructure\CommandBus; + use DistributingCarriers\Infrastructure\CassandraEventStore; + use DistributingCarriers\Application\Commands\RegisterCarrierCommand; + use DistributingCarriers\Application\Handlers\RegisterCarrierHandler; + + echo "Starting Demo...\n"; + + $session = new MockCassandraSession(); + $eventStore = new CassandraEventStore($session); + $commandBus = new CommandBus(); + + $commandBus->register(RegisterCarrierCommand::class, new RegisterCarrierHandler($eventStore)); + + $command = new RegisterCarrierCommand("Demo Carrier", "demo@example.com"); + $commandBus->dispatch($command); + + echo "Demo Completed.\n"; +}