session = $session; } public function save(string $aggregateId, array $events, int $expectedVersion): void { // In a real implementation, we would check the version for concurrency control. // For this example, we'll just append. $statement = $this->session->prepare( "INSERT INTO events (aggregate_id, version, event_type, payload, created_at) VALUES (?, ?, ?, ?, ?)" ); foreach ($events as $event) { $version = ++$expectedVersion; $payload = json_encode($event); $eventType = get_class($event); $createdAt = new Cassandra\Timestamp(); $this->session->execute($statement, [ 'arguments' => [ $aggregateId, $version, $eventType, $payload, $createdAt ] ]); } } public function getEventsForAggregate(string $aggregateId): array { $statement = $this->session->prepare( "SELECT event_type, payload FROM events WHERE aggregate_id = ?" ); $result = $this->session->execute($statement, ['arguments' => [$aggregateId]]); $events = []; foreach ($result as $row) { $eventType = $row['event_type']; $payload = json_decode($row['payload'], true); // Assuming we can reconstruct the event object from payload // This is a simplified version. if (class_exists($eventType)) { // In a real app, you might use a serializer/deserializer // Here we assume the event has a static fromArray or similar, or just public properties // For simplicity, let's assume we just return the data or a generic object if class doesn't handle it // But for the plan, let's try to instantiate. // We'll leave it as an array or basic object for now if complex reconstruction is needed. // Let's assume the event class has a constructor that takes the payload. $events[] = new $eventType($payload); } } return $events; } }