From a444c1373340e264145c838e97403ae67c46fc6a Mon Sep 17 00:00:00 2001 From: Garry Sharp <> Date: Sun, 31 Aug 2025 22:12:46 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20Add=20post=20tx=20checks=20for?= =?UTF-8?q?=20the=20positive=20cases?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/verifierapi/fees.go | 23 ++++------ plugin/fees/load.go | 5 -- plugin/fees/post_tx.go | 89 ++++++++++++++++++++++++++++++++---- plugin/fees/transaction.go | 34 +++++++++++--- storage/db.go | 7 ++- storage/postgres/fees.go | 4 +- 6 files changed, 124 insertions(+), 38 deletions(-) diff --git a/internal/verifierapi/fees.go b/internal/verifierapi/fees.go index 0a8e972..31b3b95 100644 --- a/internal/verifierapi/fees.go +++ b/internal/verifierapi/fees.go @@ -140,27 +140,24 @@ func (v *VerifierApi) CreateFeeCredit(id uuid.UUID, amount int64, publicKey stri return nil } -func (v *VerifierApi) RevertFeeCredit(txHash string, batchId uuid.UUID) error { - response, err := v.postAuth(fmt.Sprintf("/fees/revert/%s", batchId), struct{}{}) - if err != nil { - return fmt.Errorf("failed to revert fee credit: %w", err) - } - defer response.Body.Close() - - return nil -} - -func (v *VerifierApi) UpdateFeeBatchTxHash(publickey string, batchId uuid.UUID, hash string) (*FeeBatchCreateResponseDto, error) { +func (v *VerifierApi) UpdateFeeBatch(publickey string, batchId uuid.UUID, hash string, status types.FeeBatchState) (*APIResponse[FeeBatchCreateResponseDto], error) { response, err := v.putAuth("/fees/batch", FeeBatchUpdateRequestResponseDto{ PublicKey: publickey, BatchID: batchId, TxHash: hash, - Status: types.FeeBatchStateSent, + Status: status, }) if err != nil { return nil, fmt.Errorf("failed to get fee batch: %w", err) } defer response.Body.Close() + var feeBatchResponse APIResponse[FeeBatchCreateResponseDto] + if response.StatusCode != http.StatusOK { + return nil, fmt.Errorf("failed to update fee batch, status code: %d", response.StatusCode) + } + if err := json.NewDecoder(response.Body).Decode(&feeBatchResponse); err != nil { + return nil, fmt.Errorf("failed to decode fee batch response: %w", err) + } - return nil, nil + return &feeBatchResponse, nil } diff --git a/plugin/fees/load.go b/plugin/fees/load.go index 60b10b6..5ece03d 100644 --- a/plugin/fees/load.go +++ b/plugin/fees/load.go @@ -3,7 +3,6 @@ package fees import ( "context" "fmt" - "sync" "github.com/google/uuid" "github.com/hibiken/asynq" @@ -33,14 +32,11 @@ func (fp *FeePlugin) LoadFees(ctx context.Context, task *asynq.Task) error { // We limit the number of concurrent fee loading operations to 10 sem := semaphore.NewWeighted(int64(fp.config.Jobs.Load.MaxConcurrentJobs)) - var wg sync.WaitGroup var eg errgroup.Group for _, feePolicy := range feePolicies { - wg.Add(1) feePolicy = feePolicy eg.Go(func() error { - defer wg.Done() if err := sem.Acquire(ctx, 1); err != nil { return fmt.Errorf("failed to acquire semaphore: %w", err) } @@ -61,7 +57,6 @@ func (fp *FeePlugin) LoadFees(ctx context.Context, task *asynq.Task) error { }) } - wg.Wait() if err := eg.Wait(); err != nil { return fmt.Errorf("failed to execute fee loading: %w", err) } diff --git a/plugin/fees/post_tx.go b/plugin/fees/post_tx.go index b08f38e..0555c10 100644 --- a/plugin/fees/post_tx.go +++ b/plugin/fees/post_tx.go @@ -9,6 +9,7 @@ import ( "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/vultisig/plugin/internal/types" + vtypes "github.com/vultisig/verifier/types" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) @@ -38,7 +39,12 @@ func (fp *FeePlugin) HandlePostTx(ctx context.Context, task *asynq.Task) error { return fmt.Errorf("failed to acquire semaphore: %w", err) } defer sem.Release(1) - return fp.updateStatus(ctx, feeBatch, currentBlock) + if err := fp.updateStatus(ctx, feeBatch, currentBlock); err != nil { + fp.logger.WithField("batch_id", feeBatch.BatchID).Error("Failed to update fee batch status", err) + } else { + fp.logger.WithField("batch_id", feeBatch.BatchID).Info("Fee batch status update run successfully") + } + return nil }) } if err := eg.Wait(); err != nil { @@ -61,6 +67,8 @@ func (fp *FeePlugin) updateStatus(ctx context.Context, batch types.FeeBatch, cur fp.logger.WithFields(logrus.Fields{"batch_id": batch.BatchID}).Info("tx not found on chain, rebroadcasting") return nil } + + // Tx successful if receipt.Status == 1 { if currentBlock > receipt.BlockNumber.Uint64()+fp.config.Jobs.Post.SuccessConfirmations { fp.logger.WithFields(logrus.Fields{"batch_id": batch.BatchID}).Info("tx successful, setting to success") @@ -69,6 +77,7 @@ func (fp *FeePlugin) updateStatus(ctx context.Context, batch types.FeeBatch, cur if err != nil { return err } + var rollbackErr error defer func() { if rollbackErr != nil { @@ -76,16 +85,24 @@ func (fp *FeePlugin) updateStatus(ctx context.Context, batch types.FeeBatch, cur } }() - fp.verifierApi.UpdateFeeBatchTxHash(*batch.TxHash, batch.BatchID, *batch.TxHash) + hash := "" + if batch.TxHash != nil { + hash = *batch.TxHash + } - if err = fp.db.SetFeeBatchStatus(ctx, tx, batch.BatchID, types.FeeBatchStateSuccess); err != nil { + resp, err := fp.verifierApi.UpdateFeeBatch(batch.PublicKey, batch.BatchID, hash, types.FeeBatchStateSuccess) + if err != nil { rollbackErr = err return fmt.Errorf("failed to update verifier fee batch to success: %w", err) } + if resp.Error.Message != "" { + rollbackErr = fmt.Errorf("failed to update verifier fee batch to success: %s", resp.Error.Message) + return fmt.Errorf("failed to update verifier fee batch to success: %s", resp.Error.Message) + } if err = fp.db.SetFeeBatchStatus(ctx, tx, batch.BatchID, types.FeeBatchStateSuccess); err != nil { rollbackErr = err - return fmt.Errorf("failed to set fee batch success: %w", err) + return fmt.Errorf("failed to update verifier fee batch to success: %w", err) } if err = tx.Commit(ctx); err != nil { @@ -97,10 +114,66 @@ func (fp *FeePlugin) updateStatus(ctx context.Context, batch types.FeeBatch, cur return nil } } else { - // TODO failed tx logic - fp.logger.WithFields(logrus.Fields{"batch_id": batch.BatchID}).Info("tx failed, setting to failed") - fp.verifierApi.RevertFeeCredit(*batch.TxHash, batch.BatchID) - return nil + // Handle failed tx - in this case, we simply set the batch to failed. And request for the verifier to create a new debit line of "failed tx" + return fp.handleFailedTx(ctx, batch) } return nil } + +// This function sets a batch id to be failed and requests for a new debit line to be created. The failed tx then gets picked up in a new batch. +func (fp *FeePlugin) handleFailedTx(ctx context.Context, batch types.FeeBatch) error { + fp.logger.WithFields(logrus.Fields{"batch_id": batch.BatchID}).Info("tx failed, setting to failed") + + tx, err := fp.db.Pool().Begin(ctx) + if err != nil { + return err + } + var rollbackErr error + defer func() { + if rollbackErr != nil { + tx.Rollback(ctx) + } + }() + + // This api call automatically creates a new debit line for the failed tx, which will get picked up in a new batch. + err = fp.db.SetFeeBatchStatus(ctx, tx, batch.BatchID, types.FeeBatchStateFailed) + if err != nil { + rollbackErr = err + return fmt.Errorf("failed to set fee batch status to failed: %w", err) + } + + hash := "" + if batch.TxHash != nil { + hash = *batch.TxHash + } + resp, err := fp.verifierApi.UpdateFeeBatch(batch.PublicKey, batch.BatchID, hash, types.FeeBatchStateFailed) + if err != nil { + rollbackErr = err + return fmt.Errorf("failed to update verifier fee batch to failed: %w", err) + } + if resp.Error.Message != "" { + rollbackErr = fmt.Errorf("failed to update verifier fee batch to failed: %s", resp.Error.Message) + return fmt.Errorf("failed to update verifier fee batch to failed: %s", resp.Error.Message) + } + + if err = tx.Commit(ctx); err != nil { + rollbackErr = err + return fmt.Errorf("failed to commit transaction: %w", err) + } + + feePolicies, err := fp.db.GetPluginPolicies(ctx, batch.PublicKey, vtypes.PluginVultisigFees_feee, true) + if err != nil { + rollbackErr = err + return fmt.Errorf("failed to get plugin policy: %w", err) + } + + if len(feePolicies) < 1 { + rollbackErr = err + return fmt.Errorf("failed to get plugin policy: %w", err) + } + + feePolicy := feePolicies[0] + + // Immediately load a new fee batch + return fp.executeFeeLoading(ctx, feePolicy) +} diff --git a/plugin/fees/transaction.go b/plugin/fees/transaction.go index b4c5f3a..79fdf19 100644 --- a/plugin/fees/transaction.go +++ b/plugin/fees/transaction.go @@ -150,8 +150,6 @@ func (fp *FeePlugin) initSign( feeBatch types.FeeBatch, ) error { - fmt.Printf("Init sign: %+v\n", req) - sigs, err := fp.signer.Sign(ctx, req) if err != nil { fp.logger.WithError(err).Error("Keysign failed") @@ -193,11 +191,35 @@ func (fp *FeePlugin) initSign( return fmt.Errorf("failed to decode tx: %w", err) } - if err := fp.db.SetFeeBatchSent(ctx, txHash.Hash().Hex(), feeBatch.BatchID); err != nil { + tx, err := fp.db.Pool().Begin(ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + var rollbackErr error + defer func() { + if rollbackErr != nil { + tx.Rollback(ctx) + } + }() + + if err := fp.db.SetFeeBatchSent(ctx, tx, txHash.Hash().Hex(), feeBatch.BatchID); err != nil { + rollbackErr = err return fmt.Errorf("failed to set fee batch sent: %w", err) } + resp, err := fp.verifierApi.UpdateFeeBatch(pluginPolicy.PublicKey, feeBatch.BatchID, txHash.Hash().Hex(), types.FeeBatchStateSent) + if err != nil { + rollbackErr = err + return fmt.Errorf("failed to update fee batch: %w", err) + } + if resp.Error.Message != "" { + rollbackErr = err + return fmt.Errorf("failed to update fee batch: %s", resp.Error.Message) + } - fp.verifierApi.UpdateFeeBatchTxHash(pluginPolicy.PublicKey, feeBatch.BatchID, txHash.Hash().Hex()) + if err := tx.Commit(ctx); err != nil { + rollbackErr = err + return fmt.Errorf("failed to commit transaction: %w", err) + } fp.logger.WithFields(logrus.Fields{ "tx_hash": txHash.Hash().Hex(), @@ -208,14 +230,14 @@ func (fp *FeePlugin) initSign( "batch_id": feeBatch.BatchID, }).Info("fee collection transaction") - tx, err := fp.eth.Send(ctx, decodedHexTx, r, s, v) + ethTx, err := fp.eth.Send(ctx, decodedHexTx, r, s, v) if err != nil { fp.logger.WithError(err).WithField("tx_hex", req.Transaction).Error("fp.eth.Send") return fmt.Errorf("failed to send transaction: %w", err) } fp.logger.WithFields(logrus.Fields{ - "tx_hash": tx.Hash().Hex(), + "tx_hash": ethTx.Hash().Hex(), "tx_to": erc20tx.to.Hex(), "tx_amount": erc20tx.amount.String(), "tx_token": erc20tx.token.Hex(), diff --git a/storage/db.go b/storage/db.go index 1e9ba06..26f38a4 100644 --- a/storage/db.go +++ b/storage/db.go @@ -21,12 +21,11 @@ type DatabaseStorage interface { InsertPluginPolicyTx(ctx context.Context, dbTx pgx.Tx, policy vtypes.PluginPolicy) (*vtypes.PluginPolicy, error) UpdatePluginPolicyTx(ctx context.Context, dbTx pgx.Tx, policy vtypes.PluginPolicy) (*vtypes.PluginPolicy, error) - CreateFeeBatch(ctx context.Context, tx pgx.Tx, batches ...types.FeeBatch) ([]types.FeeBatch, error) - SetFeeBatchTxHash(ctx context.Context, tx pgx.Tx, batchId uuid.UUID, txHash string) error - SetFeeBatchStatus(ctx context.Context, tx pgx.Tx, batchId uuid.UUID, status types.FeeBatchState) error GetFeeBatch(ctx context.Context, batchIDs ...uuid.UUID) ([]types.FeeBatch, error) GetFeeBatchByStatus(ctx context.Context, status types.FeeBatchState) ([]types.FeeBatch, error) - SetFeeBatchSent(ctx context.Context, txHash string, batchId uuid.UUID) error + CreateFeeBatch(ctx context.Context, tx pgx.Tx, batches ...types.FeeBatch) ([]types.FeeBatch, error) + SetFeeBatchStatus(ctx context.Context, tx pgx.Tx, batchId uuid.UUID, status types.FeeBatchState) error + SetFeeBatchSent(ctx context.Context, tx pgx.Tx, txHash string, batchId uuid.UUID) error Pool() *pgxpool.Pool } diff --git a/storage/postgres/fees.go b/storage/postgres/fees.go index cc51670..1ccfb6e 100644 --- a/storage/postgres/fees.go +++ b/storage/postgres/fees.go @@ -79,8 +79,8 @@ func (p *PostgresBackend) GetFeeBatchByStatus(ctx context.Context, status types. return feeBatches, nil } -func (p *PostgresBackend) SetFeeBatchSent(ctx context.Context, txHash string, batchId uuid.UUID) error { +func (p *PostgresBackend) SetFeeBatchSent(ctx context.Context, tx pgx.Tx, txHash string, batchId uuid.UUID) error { query := `update fee_batch set status = $1, tx_hash = $2 where batch_id = $3` - _, err := p.pool.Exec(ctx, query, types.FeeBatchStateSent, txHash, batchId) + _, err := tx.Exec(ctx, query, types.FeeBatchStateSent, txHash, batchId) return err }