diff --git a/src/Repository/DefaultRepository.php b/src/Repository/DefaultRepository.php index 9091e5fb9..5ad7b278b 100644 --- a/src/Repository/DefaultRepository.php +++ b/src/Repository/DefaultRepository.php @@ -11,20 +11,13 @@ use Patchlevel\EventSourcing\Message\Message; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata; use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StoreAdapter; use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound; use Patchlevel\EventSourcing\Snapshot\SnapshotStore; use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid; -use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion; -use Patchlevel\EventSourcing\Store\Criteria\Criteria; -use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion; -use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion; -use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion; use Patchlevel\EventSourcing\Store\Header\PlayheadHeader; use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader; -use Patchlevel\EventSourcing\Store\Header\StreamNameHeader; -use Patchlevel\EventSourcing\Store\Store; use Patchlevel\EventSourcing\Store\Stream; -use Patchlevel\EventSourcing\Store\StreamStartHeader; use Patchlevel\EventSourcing\Store\UniqueConstraintViolation; use Psr\Clock\ClockInterface; use Psr\Log\LoggerInterface; @@ -32,7 +25,6 @@ use Throwable; use Traversable; use WeakMap; - use function array_map; use function assert; use function count; @@ -52,7 +44,7 @@ final class DefaultRepository implements Repository /** @param AggregateRootMetadata $metadata */ public function __construct( - private readonly Store $store, + private readonly StoreAdapter $messageAdapter, private readonly AggregateRootMetadata $metadata, private readonly EventBus|null $eventBus = null, private readonly SnapshotStore|null $snapshotStore = null, @@ -110,15 +102,12 @@ public function load(Identifier $id): AggregateRoot } } - $criteria = new Criteria( - new StreamCriterion($this->metadata->streamName($id->toString())), - new ArchivedCriterion(false), - ); - $stream = null; try { - $stream = $this->store->load($criteria); + $stream = $this->messageAdapter->load( + $this->metadata->streamName($id->toString()), + ); $firstMessage = $stream->current(); @@ -163,11 +152,7 @@ public function load(Identifier $id): AggregateRoot public function has(Identifier $id): bool { - $criteria = new Criteria( - new StreamCriterion($this->metadata->streamName($id->toString())), - ); - - return $this->store->count($criteria) > 0; + return $this->messageAdapter->count($this->metadata->streamName($id->toString())) > 0; } /** @param T $aggregate */ @@ -223,18 +208,13 @@ public function save(AggregateRoot $aggregate): void $streamName = $this->metadata->streamName($aggregateId); - $archiveTo = null; - $messages = array_map( static function (object $event) use ( &$playhead, - &$archiveTo, $messageDecorator, $clock, - $streamName, ) { $message = Message::create($event) - ->withHeader(new StreamNameHeader($streamName)) ->withHeader(new PlayheadHeader(++$playhead)) ->withHeader(new RecordedOnHeader($clock->now())); @@ -242,31 +222,13 @@ static function (object $event) use ( $message = $messageDecorator($message); } - if ($message->hasHeader(StreamStartHeader::class)) { - $archiveTo = $playhead; - } - return $message; }, $events, ); try { - if ($archiveTo !== null) { - $this->store->transactional( - function () use ($messages, $streamName, $archiveTo): void { - $this->store->save(...$messages); - $this->store->archive( - new Criteria( - new StreamCriterion($streamName), - new ToPlayheadCriterion($archiveTo), - ), - ); - }, - ); - } else { - $this->store->save(...$messages); - } + $this->messageAdapter->write($streamName, $messages); } catch (UniqueConstraintViolation) { if ($newAggregate) { $this->logger->error( @@ -320,15 +282,10 @@ private function loadFromSnapshot(string $aggregateClass, Identifier $id): Aggre $aggregate = $this->snapshotStore->load($aggregateClass, $id); - $criteria = new Criteria( - new StreamCriterion($this->metadata->streamName($id->toString())), - new FromPlayheadCriterion($aggregate->playhead()), - ); - $stream = null; try { - $stream = $this->store->load($criteria); + $stream = $this->messageAdapter->load($this->metadata->streamName($id->toString()), $aggregate->playhead()); if ($stream->current() === null) { $this->aggregateIsValid[$aggregate] = true; diff --git a/src/Repository/DefaultRepositoryManager.php b/src/Repository/DefaultRepositoryManager.php index e208e167f..b6bf3753c 100644 --- a/src/Repository/DefaultRepositoryManager.php +++ b/src/Repository/DefaultRepositoryManager.php @@ -12,12 +12,11 @@ use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory; use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry; use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StoreAdapter; use Patchlevel\EventSourcing\Snapshot\SnapshotStore; -use Patchlevel\EventSourcing\Store\Store; use Psr\Clock\ClockInterface; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; - use function array_key_exists; final class DefaultRepositoryManager implements RepositoryManager @@ -31,7 +30,7 @@ final class DefaultRepositoryManager implements RepositoryManager public function __construct( private readonly AggregateRootRegistry $aggregateRootRegistry, - private readonly Store $store, + private readonly StoreAdapter $storeAdapter, private readonly EventBus|null $eventBus = null, private readonly SnapshotStore|null $snapshotStore = null, private readonly MessageDecorator|null $messageDecorator = null, @@ -65,7 +64,7 @@ public function get(string $aggregateClass): Repository } return $this->instances[$aggregateClass] = new DefaultRepository( - $this->store, + $this->storeAdapter, $this->metadataFactory->metadata($aggregateClass), $this->eventBus, $this->snapshotStore, diff --git a/src/Repository/StoreAdapter/StoreAdapter.php b/src/Repository/StoreAdapter/StoreAdapter.php new file mode 100644 index 000000000..25bdb466e --- /dev/null +++ b/src/Repository/StoreAdapter/StoreAdapter.php @@ -0,0 +1,18 @@ + $messages + */ + public function write(string $stream, iterable $messages): void; +} \ No newline at end of file diff --git a/src/Repository/StoreAdapter/StreamDoctrineDbalStoreAdapter.php b/src/Repository/StoreAdapter/StreamDoctrineDbalStoreAdapter.php new file mode 100644 index 000000000..25526f816 --- /dev/null +++ b/src/Repository/StoreAdapter/StreamDoctrineDbalStoreAdapter.php @@ -0,0 +1,93 @@ +add(new FromPlayheadCriterion($fromPlayhead)); + } + + return $this->store->load($criteria); + } + + public function count(string $stream): int + { + $criteria = new Criteria( + new StreamCriterion($stream), + ); + + return $this->store->count($criteria); + } + + /** + * @param iterable $messages + */ + public function write(string $stream, iterable $messages): void + { + $archiveTo = null; + + $this->store->save( + ...array_map( + static function (Message $message) use ( + $stream, + &$archiveTo + ) { + if ($message->hasHeader(StreamStartHeader::class)) { + try { + $archiveTo = $message->header(PlayheadHeader::class)->playhead; + } catch (Throwable) { + } + } + + return $message->withHeader(new StreamNameHeader($stream)); + }, + $messages, + ) + ); + + if ($archiveTo === null) { + $this->store->save(...$messages); + + return; + } + + $this->store->transactional( + static function () use ($stream, $archiveTo, $messages): void { + $this->store->save(...$messages); + + $this->store->archive( + new Criteria( + new StreamCriterion($stream), + new ToPlayheadCriterion($archiveTo), + ), + ); + } + ); + } +} \ No newline at end of file diff --git a/src/Repository/StoreAdapter/TaggableDoctrineDbalStoreAdapter.php b/src/Repository/StoreAdapter/TaggableDoctrineDbalStoreAdapter.php new file mode 100644 index 000000000..5c4d7d233 --- /dev/null +++ b/src/Repository/StoreAdapter/TaggableDoctrineDbalStoreAdapter.php @@ -0,0 +1,94 @@ +add(new FromPlayheadCriterion($fromPlayhead)); + } + + return $this->store->load($criteria); + } + + public function count(string $stream): int + { + $criteria = new Criteria( + new StreamCriterion($stream), + ); + + return $this->store->count($criteria); + } + + /** + * @param iterable $messages + */ + public function write(string $stream, iterable $messages): void + { + $archiveTo = null; + + $this->store->save( + ...array_map( + static function (Message $message) use ( + $stream, + &$archiveTo + ) { + if ($message->hasHeader(StreamStartHeader::class)) { + try { + $archiveTo = $message->header(PlayheadHeader::class)->playhead; + } catch (Throwable) { + } + } + + return $message->withHeader(new StreamNameHeader($stream)); + }, + $messages, + ) + ); + + if ($archiveTo === null) { + $this->store->save(...$messages); + + return; + } + + $this->store->transactional( + static function () use ($stream, $archiveTo, $messages): void { + $this->store->save(...$messages); + + $this->store->archive( + new Criteria( + new StreamCriterion($stream), + new ToPlayheadCriterion($archiveTo), + ), + ); + } + ); + } +} \ No newline at end of file diff --git a/tests/Benchmark/CommandToQueryBench.php b/tests/Benchmark/CommandToQueryBench.php index e6695a4e1..8f43035bd 100644 --- a/tests/Benchmark/CommandToQueryBench.php +++ b/tests/Benchmark/CommandToQueryBench.php @@ -13,6 +13,7 @@ use Patchlevel\EventSourcing\QueryBus\ServiceHandlerProvider; use Patchlevel\EventSourcing\QueryBus\SyncQueryBus; use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter; @@ -56,7 +57,7 @@ public function setUp(): void $manager = new DefaultRepositoryManager( $aggregateRootRegistry, - $store, + new StreamDoctrineDbalStoreAdapter($store), null, new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]), ); diff --git a/tests/Benchmark/PersonalDataBench.php b/tests/Benchmark/PersonalDataBench.php index 0e7b93a92..efb808d5d 100644 --- a/tests/Benchmark/PersonalDataBench.php +++ b/tests/Benchmark/PersonalDataBench.php @@ -8,6 +8,7 @@ use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -46,7 +47,10 @@ public function setUp(): void ), ); - $this->repository = new DefaultRepository($this->store, Profile::metadata()); + $this->repository = new DefaultRepository( + new StreamDoctrineDbalStoreAdapter($this->store), + Profile::metadata() + ); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SimpleSetupStreamStoreBench.php b/tests/Benchmark/SimpleSetupStreamStoreBench.php index 2b3397355..df99923d3 100644 --- a/tests/Benchmark/SimpleSetupStreamStoreBench.php +++ b/tests/Benchmark/SimpleSetupStreamStoreBench.php @@ -7,6 +7,7 @@ use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\Store; @@ -34,7 +35,10 @@ public function setUp(): void DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), ); - $this->repository = new DefaultRepository($this->store, Profile::metadata()); + $this->repository = new DefaultRepository( + new StreamDoctrineDbalStoreAdapter($this->store), + Profile::metadata() + ); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SimpleSetupTaggableStoreBench.php b/tests/Benchmark/SimpleSetupTaggableStoreBench.php index b0759142f..8bdcf1bcb 100644 --- a/tests/Benchmark/SimpleSetupTaggableStoreBench.php +++ b/tests/Benchmark/SimpleSetupTaggableStoreBench.php @@ -9,6 +9,7 @@ use Patchlevel\EventSourcing\Metadata\Event\AttributeEventRegistryFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\TaggableDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\TaggableDoctrineDbalStore; @@ -37,7 +38,10 @@ public function setUp(): void (new AttributeEventRegistryFactory())->create([__DIR__ . '/BasicImplementation/Events']), ); - $this->repository = new DefaultRepository($this->store, Profile::metadata()); + $this->repository = new DefaultRepository( + new TaggableDoctrineDbalStoreAdapter($this->store), + Profile::metadata() + ); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SnapshotsBench.php b/tests/Benchmark/SnapshotsBench.php index 43d8ec223..a2e0cf974 100644 --- a/tests/Benchmark/SnapshotsBench.php +++ b/tests/Benchmark/SnapshotsBench.php @@ -7,6 +7,7 @@ use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter; @@ -43,7 +44,12 @@ public function setUp(): void $this->snapshotStore = new DefaultSnapshotStore(['default' => $this->adapter]); - $this->repository = new DefaultRepository($this->store, Profile::metadata(), null, $this->snapshotStore); + $this->repository = new DefaultRepository( + new StreamDoctrineDbalStoreAdapter($this->store), + Profile::metadata(), + null, + $this->snapshotStore + ); $schemaDirector = new DoctrineSchemaDirector( $connection, diff --git a/tests/Benchmark/SplitStreamBench.php b/tests/Benchmark/SplitStreamBench.php index 48ad920cf..8cdaae080 100644 --- a/tests/Benchmark/SplitStreamBench.php +++ b/tests/Benchmark/SplitStreamBench.php @@ -8,6 +8,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\MessageDecorator\SplitStreamDecorator; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\Store; @@ -37,7 +38,7 @@ public function setUp(): void ); $this->repository = new DefaultRepository( - $this->store, + new StreamDoctrineDbalStoreAdapter($this->store), Profile::metadata(), null, null, diff --git a/tests/Benchmark/SubscriptionEngineBatchBench.php b/tests/Benchmark/SubscriptionEngineBatchBench.php index f610ce9b2..43f5bbc28 100644 --- a/tests/Benchmark/SubscriptionEngineBatchBench.php +++ b/tests/Benchmark/SubscriptionEngineBatchBench.php @@ -7,6 +7,7 @@ use Patchlevel\EventSourcing\Identifier\Identifier; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -43,7 +44,10 @@ public function setUp(): void DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), ); - $this->repository = new DefaultRepository($this->store, Profile::metadata()); + $this->repository = new DefaultRepository( + new StreamDoctrineDbalStoreAdapter($this->store), + Profile::metadata() + ); $subscriptionStore = new DoctrineSubscriptionStore( $connection, diff --git a/tests/Benchmark/SubscriptionEngineBench.php b/tests/Benchmark/SubscriptionEngineBench.php index 70783e7e8..5d5172021 100644 --- a/tests/Benchmark/SubscriptionEngineBench.php +++ b/tests/Benchmark/SubscriptionEngineBench.php @@ -8,6 +8,7 @@ use Patchlevel\EventSourcing\Metadata\Event\AttributeEventMetadataFactory; use Patchlevel\EventSourcing\Repository\DefaultRepository; use Patchlevel\EventSourcing\Repository\Repository; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; @@ -44,7 +45,10 @@ public function setUp(): void DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']), ); - $this->repository = new DefaultRepository($this->store, Profile::metadata()); + $this->repository = new DefaultRepository( + new StreamDoctrineDbalStoreAdapter($this->store), + Profile::metadata() + ); $subscriptionStore = new DoctrineSubscriptionStore( $connection, diff --git a/tests/Integration/BankAccountSplitStream/IntegrationTest.php b/tests/Integration/BankAccountSplitStream/IntegrationTest.php index 5e1f8be12..0db8e3fbb 100644 --- a/tests/Integration/BankAccountSplitStream/IntegrationTest.php +++ b/tests/Integration/BankAccountSplitStream/IntegrationTest.php @@ -10,6 +10,7 @@ use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager; use Patchlevel\EventSourcing\Repository\MessageDecorator\ChainMessageDecorator; use Patchlevel\EventSourcing\Repository\MessageDecorator\SplitStreamDecorator; +use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter; use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector; use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer; use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore; @@ -59,7 +60,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([ @@ -98,7 +99,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([ @@ -137,7 +138,7 @@ public function testSuccessful(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([ @@ -174,7 +175,7 @@ public function testRemoveArchived(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([ @@ -213,7 +214,7 @@ public function testRemoveArchived(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([ @@ -252,7 +253,7 @@ public function testRemoveArchived(): void $manager = new DefaultRepositoryManager( new AggregateRootRegistry(['bank_account' => BankAccount::class]), - $store, + new StreamDoctrineDbalStoreAdapter($store), null, null, new ChainMessageDecorator([