Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 8 additions & 51 deletions src/Repository/DefaultRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,20 @@
use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadata;
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
use Patchlevel\EventSourcing\Repository\StoreAdapter\StoreAdapter;
use Patchlevel\EventSourcing\Snapshot\SnapshotNotFound;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Snapshot\SnapshotVersionInvalid;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Store;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\UniqueConstraintViolation;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Throwable;
use Traversable;
use WeakMap;

use function array_map;
use function assert;
use function count;
Expand All @@ -52,7 +44,7 @@ final class DefaultRepository implements Repository

/** @param AggregateRootMetadata<T> $metadata */
public function __construct(
private readonly Store $store,
private readonly StoreAdapter $messageAdapter,
private readonly AggregateRootMetadata $metadata,
private readonly EventBus|null $eventBus = null,
private readonly SnapshotStore|null $snapshotStore = null,
Expand Down Expand Up @@ -110,15 +102,12 @@ public function load(Identifier $id): AggregateRoot
}
}

$criteria = new Criteria(
new StreamCriterion($this->metadata->streamName($id->toString())),
new ArchivedCriterion(false),
);

$stream = null;

try {
$stream = $this->store->load($criteria);
$stream = $this->messageAdapter->load(
$this->metadata->streamName($id->toString()),
);

$firstMessage = $stream->current();

Expand Down Expand Up @@ -163,11 +152,7 @@ public function load(Identifier $id): AggregateRoot

public function has(Identifier $id): bool
{
$criteria = new Criteria(
new StreamCriterion($this->metadata->streamName($id->toString())),
);

return $this->store->count($criteria) > 0;
return $this->messageAdapter->count($this->metadata->streamName($id->toString())) > 0;
}

/** @param T $aggregate */
Expand Down Expand Up @@ -223,50 +208,27 @@ public function save(AggregateRoot $aggregate): void

$streamName = $this->metadata->streamName($aggregateId);

$archiveTo = null;

$messages = array_map(
static function (object $event) use (
&$playhead,
&$archiveTo,
$messageDecorator,
$clock,
$streamName,
) {
$message = Message::create($event)
->withHeader(new StreamNameHeader($streamName))
->withHeader(new PlayheadHeader(++$playhead))
->withHeader(new RecordedOnHeader($clock->now()));

if ($messageDecorator) {
$message = $messageDecorator($message);
}

if ($message->hasHeader(StreamStartHeader::class)) {
$archiveTo = $playhead;
}

return $message;
},
$events,
);

try {
if ($archiveTo !== null) {
$this->store->transactional(
function () use ($messages, $streamName, $archiveTo): void {
$this->store->save(...$messages);
$this->store->archive(
new Criteria(
new StreamCriterion($streamName),
new ToPlayheadCriterion($archiveTo),
),
);
},
);
} else {
$this->store->save(...$messages);
}
$this->messageAdapter->write($streamName, $messages);
} catch (UniqueConstraintViolation) {
if ($newAggregate) {
$this->logger->error(
Expand Down Expand Up @@ -320,15 +282,10 @@ private function loadFromSnapshot(string $aggregateClass, Identifier $id): Aggre

$aggregate = $this->snapshotStore->load($aggregateClass, $id);

$criteria = new Criteria(
new StreamCriterion($this->metadata->streamName($id->toString())),
new FromPlayheadCriterion($aggregate->playhead()),
);

$stream = null;

try {
$stream = $this->store->load($criteria);
$stream = $this->messageAdapter->load($this->metadata->streamName($id->toString()), $aggregate->playhead());

if ($stream->current() === null) {
$this->aggregateIsValid[$aggregate] = true;
Expand Down
7 changes: 3 additions & 4 deletions src/Repository/DefaultRepositoryManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootMetadataFactory;
use Patchlevel\EventSourcing\Metadata\AggregateRoot\AggregateRootRegistry;
use Patchlevel\EventSourcing\Repository\MessageDecorator\MessageDecorator;
use Patchlevel\EventSourcing\Repository\StoreAdapter\StoreAdapter;
use Patchlevel\EventSourcing\Snapshot\SnapshotStore;
use Patchlevel\EventSourcing\Store\Store;
use Psr\Clock\ClockInterface;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

use function array_key_exists;

final class DefaultRepositoryManager implements RepositoryManager
Expand All @@ -31,7 +30,7 @@ final class DefaultRepositoryManager implements RepositoryManager

public function __construct(
private readonly AggregateRootRegistry $aggregateRootRegistry,
private readonly Store $store,
private readonly StoreAdapter $storeAdapter,
private readonly EventBus|null $eventBus = null,
private readonly SnapshotStore|null $snapshotStore = null,
private readonly MessageDecorator|null $messageDecorator = null,
Expand Down Expand Up @@ -65,7 +64,7 @@ public function get(string $aggregateClass): Repository
}

return $this->instances[$aggregateClass] = new DefaultRepository(
$this->store,
$this->storeAdapter,
$this->metadataFactory->metadata($aggregateClass),
$this->eventBus,
$this->snapshotStore,
Expand Down
18 changes: 18 additions & 0 deletions src/Repository/StoreAdapter/StoreAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Patchlevel\EventSourcing\Repository\StoreAdapter;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Stream;

interface StoreAdapter
{
public function load(string $stream, int|null $fromPlayhead = null): Stream;

public function count(string $stream): int;

/**
* @param iterable<Message> $messages
*/
public function write(string $stream, iterable $messages): void;
}
93 changes: 93 additions & 0 deletions src/Repository/StoreAdapter/StreamDoctrineDbalStoreAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

namespace Patchlevel\EventSourcing\Repository\StoreAdapter;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Throwable;

final readonly class StreamDoctrineDbalStoreAdapter implements StoreAdapter
{
public function __construct(
private StreamDoctrineDbalStore $store,
) {
}

public function load(string $stream, int|null $fromPlayhead = null): Stream
{
$criteria = new Criteria(
new StreamCriterion($stream),
new ArchivedCriterion(false),
);

if ($fromPlayhead !== null) {
$criteria->add(new FromPlayheadCriterion($fromPlayhead));
}

return $this->store->load($criteria);
}

public function count(string $stream): int
{
$criteria = new Criteria(
new StreamCriterion($stream),
);

return $this->store->count($criteria);
}

/**
* @param iterable<Message> $messages
*/
public function write(string $stream, iterable $messages): void
{
$archiveTo = null;

$this->store->save(
...array_map(
static function (Message $message) use (
$stream,
&$archiveTo
) {
if ($message->hasHeader(StreamStartHeader::class)) {
try {
$archiveTo = $message->header(PlayheadHeader::class)->playhead;
} catch (Throwable) {
}
}

return $message->withHeader(new StreamNameHeader($stream));
},
$messages,
)
);

if ($archiveTo === null) {
$this->store->save(...$messages);

return;
}

$this->store->transactional(
static function () use ($stream, $archiveTo, $messages): void {
$this->store->save(...$messages);

$this->store->archive(
new Criteria(
new StreamCriterion($stream),
new ToPlayheadCriterion($archiveTo),
),
);
}
);
}
}
94 changes: 94 additions & 0 deletions src/Repository/StoreAdapter/TaggableDoctrineDbalStoreAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

namespace Patchlevel\EventSourcing\Repository\StoreAdapter;

use Patchlevel\EventSourcing\Message\Message;
use Patchlevel\EventSourcing\Store\Criteria\ArchivedCriterion;
use Patchlevel\EventSourcing\Store\Criteria\Criteria;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Criteria\ToPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use Patchlevel\EventSourcing\Store\Stream;
use Patchlevel\EventSourcing\Store\StreamDoctrineDbalStore;
use Patchlevel\EventSourcing\Store\StreamStartHeader;
use Patchlevel\EventSourcing\Store\TaggableDoctrineDbalStore;
use Throwable;

final readonly class TaggableDoctrineDbalStoreAdapter implements StoreAdapter
{
public function __construct(
private TaggableDoctrineDbalStore $store,
) {
}

public function load(string $stream, int|null $fromPlayhead = null): Stream
{
$criteria = new Criteria(
new StreamCriterion($stream),
new ArchivedCriterion(false),
);

if ($fromPlayhead !== null) {
$criteria->add(new FromPlayheadCriterion($fromPlayhead));
}

return $this->store->load($criteria);
}

public function count(string $stream): int
{
$criteria = new Criteria(
new StreamCriterion($stream),
);

return $this->store->count($criteria);
}

/**
* @param iterable<Message> $messages
*/
public function write(string $stream, iterable $messages): void
{
$archiveTo = null;

$this->store->save(
...array_map(
static function (Message $message) use (
$stream,
&$archiveTo
) {
if ($message->hasHeader(StreamStartHeader::class)) {
try {
$archiveTo = $message->header(PlayheadHeader::class)->playhead;
} catch (Throwable) {
}
}

return $message->withHeader(new StreamNameHeader($stream));
},
$messages,
)
);

if ($archiveTo === null) {
$this->store->save(...$messages);

return;
}

$this->store->transactional(
static function () use ($stream, $archiveTo, $messages): void {
$this->store->save(...$messages);

$this->store->archive(
new Criteria(
new StreamCriterion($stream),
new ToPlayheadCriterion($archiveTo),
),
);
}
);
}
}
3 changes: 2 additions & 1 deletion tests/Benchmark/CommandToQueryBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Patchlevel\EventSourcing\QueryBus\ServiceHandlerProvider;
use Patchlevel\EventSourcing\QueryBus\SyncQueryBus;
use Patchlevel\EventSourcing\Repository\DefaultRepositoryManager;
use Patchlevel\EventSourcing\Repository\StoreAdapter\StreamDoctrineDbalStoreAdapter;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
use Patchlevel\EventSourcing\Snapshot\Adapter\InMemorySnapshotAdapter;
Expand Down Expand Up @@ -56,7 +57,7 @@ public function setUp(): void

$manager = new DefaultRepositoryManager(
$aggregateRootRegistry,
$store,
new StreamDoctrineDbalStoreAdapter($store),
null,
new DefaultSnapshotStore(['default' => new InMemorySnapshotAdapter()]),
);
Expand Down
Loading