diff --git a/plugin/fees/helper.go b/plugin/fees/helper.go index 8e0a02e..00c61e8 100644 --- a/plugin/fees/helper.go +++ b/plugin/fees/helper.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/ethereum/go-ethereum/accounts/abi" + "github.com/ethereum/go-ethereum/common" ecommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" etypes "github.com/ethereum/go-ethereum/core/types" @@ -19,7 +20,7 @@ import ( vtypes "github.com/vultisig/verifier/types" ) -func getHash(inTx evm.UnsignedTx, r, s, v []byte, chainID *big.Int) (*etypes.Transaction, error) { +func getTransaction(inTx evm.UnsignedTx, r, s, v []byte, chainID *big.Int) (*etypes.Transaction, error) { var sig []byte sig = append(sig, r...) sig = append(sig, s...) @@ -38,6 +39,15 @@ func getHash(inTx evm.UnsignedTx, r, s, v []byte, chainID *big.Int) (*etypes.Tra return outTx, nil } +func parseTransaction(rawTx string) (*etypes.Transaction, error) { + txBytes := common.FromHex(rawTx) + var tx etypes.Transaction + if err := rlp.DecodeBytes(txBytes, &tx); err != nil { + return nil, fmt.Errorf("rlp.DecodeBytes: %w", err) + } + return &tx, nil +} + type erc20tx struct { to ecommon.Address `json:"to"` amount *big.Int `json:"amount"` diff --git a/plugin/fees/post_tx.go b/plugin/fees/post_tx.go index 0555c10..097c86f 100644 --- a/plugin/fees/post_tx.go +++ b/plugin/fees/post_tx.go @@ -65,7 +65,15 @@ func (fp *FeePlugin) updateStatus(ctx context.Context, batch types.FeeBatch, cur if err == ethereum.NotFound { // TODO rebroadcast logic fp.logger.WithFields(logrus.Fields{"batch_id": batch.BatchID}).Info("tx not found on chain, rebroadcasting") - return nil + rawTx, err := fp.db.GetTx(ctx, *batch.TxHash) + if err != nil { + return fmt.Errorf("failed to get tx: %w", err) + } + tx, err := parseTransaction(rawTx) + if err != nil { + return fmt.Errorf("failed to parse tx: %w", err) + } + return fp.ethClient.SendTransaction(ctx, tx) } // Tx successful diff --git a/plugin/fees/transaction.go b/plugin/fees/transaction.go index 79fdf19..9d12888 100644 --- a/plugin/fees/transaction.go +++ b/plugin/fees/transaction.go @@ -180,7 +180,7 @@ func (fp *FeePlugin) initSign( return fmt.Errorf("error decoding tx or sigs: %w", errors.Join(rErr, sErr, vErr)) } - txHash, err := getHash(decodedHexTx, r, s, v, fp.config.ChainId) + transaction, err := getTransaction(decodedHexTx, r, s, v, fp.config.ChainId) if err != nil { return fmt.Errorf("failed to get hash: %w", err) } @@ -202,11 +202,22 @@ func (fp *FeePlugin) initSign( } }() - if err := fp.db.SetFeeBatchSent(ctx, tx, txHash.Hash().Hex(), feeBatch.BatchID); err != nil { + if err := fp.db.SetFeeBatchSent(ctx, tx, transaction.Hash().Hex(), feeBatch.BatchID); err != nil { rollbackErr = err return fmt.Errorf("failed to set fee batch sent: %w", err) } - resp, err := fp.verifierApi.UpdateFeeBatch(pluginPolicy.PublicKey, feeBatch.BatchID, txHash.Hash().Hex(), types.FeeBatchStateSent) + + txRawBytes, err := transaction.MarshalBinary() + if err != nil { + return fmt.Errorf("failed to encode transaction: %w", err) + } + + if err := fp.db.InsertTx(ctx, tx, hexutil.Encode(txRawBytes)); err != nil { + rollbackErr = err + return fmt.Errorf("failed to insert tx: %w", err) + } + + resp, err := fp.verifierApi.UpdateFeeBatch(pluginPolicy.PublicKey, feeBatch.BatchID, transaction.Hash().Hex(), types.FeeBatchStateSent) if err != nil { rollbackErr = err return fmt.Errorf("failed to update fee batch: %w", err) @@ -222,7 +233,7 @@ func (fp *FeePlugin) initSign( } fp.logger.WithFields(logrus.Fields{ - "tx_hash": txHash.Hash().Hex(), + "tx_hash": transaction.Hash().Hex(), "tx_to": erc20tx.to.Hex(), "tx_amount": erc20tx.amount.String(), "tx_token": erc20tx.token.Hex(), diff --git a/storage/db.go b/storage/db.go index 26f38a4..6a5b7ba 100644 --- a/storage/db.go +++ b/storage/db.go @@ -27,5 +27,8 @@ type DatabaseStorage interface { SetFeeBatchStatus(ctx context.Context, tx pgx.Tx, batchId uuid.UUID, status types.FeeBatchState) error SetFeeBatchSent(ctx context.Context, tx pgx.Tx, txHash string, batchId uuid.UUID) error + InsertTx(ctx context.Context, tx pgx.Tx, rawTx string) error + GetTx(ctx context.Context, txHash string) (string, error) + Pool() *pgxpool.Pool } diff --git a/storage/postgres/fees.go b/storage/postgres/fees.go index 1ccfb6e..1883142 100644 --- a/storage/postgres/fees.go +++ b/storage/postgres/fees.go @@ -3,9 +3,11 @@ package postgres import ( "context" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/google/uuid" "github.com/jackc/pgx/v5" + etypes "github.com/ethereum/go-ethereum/core/types" "github.com/vultisig/plugin/internal/types" ) @@ -84,3 +86,35 @@ func (p *PostgresBackend) SetFeeBatchSent(ctx context.Context, tx pgx.Tx, txHash _, err := tx.Exec(ctx, query, types.FeeBatchStateSent, txHash, batchId) return err } + +func (p *PostgresBackend) InsertTx(ctx context.Context, tx pgx.Tx, rawTx string) error { + txBytes, err := hexutil.Decode(rawTx) + if err != nil { + return err + } + + var transaction etypes.Transaction + if err := transaction.UnmarshalBinary(txBytes); err != nil { + return err + } + + _, err = tx.Exec(ctx, `insert into fee_tx (hash, raw_tx) values ($1, $2)`, transaction.Hash().Hex(), rawTx) + if err != nil { + return err + } + return nil +} + +func (p *PostgresBackend) GetTx(ctx context.Context, txHash string) (string, error) { + query := `select raw_tx from fee_tx where hash = $1` + rows, err := p.pool.Query(ctx, query, txHash) + if err != nil { + return "", err + } + var rawTx string + defer rows.Close() + if err := rows.Scan(&rawTx); err != nil { + return "", err + } + return rawTx, nil +} diff --git a/storage/postgres/migrations/plugin/20250630152230_fee_runs.sql b/storage/postgres/migrations/plugin/20250630152230_fee_runs.sql index 681ab53..aa3ac29 100644 --- a/storage/postgres/migrations/plugin/20250630152230_fee_runs.sql +++ b/storage/postgres/migrations/plugin/20250630152230_fee_runs.sql @@ -15,6 +15,11 @@ CREATE TABLE IF NOT EXISTS fee_batch ( tx_hash VARCHAR(66) ); +CREATE TABLE fee_tx ( + hash VARCHAR(66) PRIMARY KEY, + raw_tx TEXT NOT NULL +); + -- Create trigger to update updated_at timestamp CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS TRIGGER AS $$ @@ -32,3 +37,4 @@ DROP FUNCTION IF EXISTS update_updated_at_column(); DROP TABLE IF EXISTS fee_batch; DROP TYPE IF EXISTS fee_batch_status; -- +goose StatementEnd + diff --git a/storage/postgres/schema/schema.sql b/storage/postgres/schema/schema.sql index 5718f0b..8b13789 100644 --- a/storage/postgres/schema/schema.sql +++ b/storage/postgres/schema/schema.sql @@ -1,147 +1 @@ -CREATE TYPE "fee_batch_status" AS ENUM ( - 'draft', - 'sent', - 'completed', - 'failed' -); - -CREATE TYPE "plugin_id" AS ENUM ( - 'vultisig-dca-0000', - 'vultisig-payroll-0000', - 'vultisig-fees-feee', - 'vultisig-copytrader-0000' -); - -CREATE TYPE "tx_indexer_status" AS ENUM ( - 'PROPOSED', - 'VERIFIED', - 'SIGNED' -); - -CREATE TYPE "tx_indexer_status_onchain" AS ENUM ( - 'PENDING', - 'SUCCESS', - 'FAIL' -); - -CREATE FUNCTION "prevent_insert_if_policy_deleted"() RETURNS "trigger" - LANGUAGE "plpgsql" - AS $$ -BEGIN - IF NEW.deleted = true THEN - RAISE EXCEPTION 'Cannot insert a deleted policy'; - END IF; - RETURN NEW; -END; -$$; - -CREATE FUNCTION "prevent_update_if_policy_deleted"() RETURNS "trigger" - LANGUAGE "plpgsql" - AS $$ -BEGIN - IF OLD.deleted = true THEN - RAISE EXCEPTION 'Cannot update a deleted policy'; - END IF; - RETURN NEW; -END; -$$; - -CREATE FUNCTION "set_policy_inactive_on_delete"() RETURNS "trigger" - LANGUAGE "plpgsql" - AS $$ -BEGIN - IF NEW.deleted = true THEN - NEW.active := false; - END IF; - RETURN NEW; -END; -$$; - -CREATE FUNCTION "update_updated_at_column"() RETURNS "trigger" - LANGUAGE "plpgsql" - AS $$ -BEGIN - NEW.updated_at = NOW(); - RETURN NEW; -END; -$$; - -CREATE TABLE "fee_batch" ( - "id" "uuid" DEFAULT "gen_random_uuid"() NOT NULL, - "batch_id" "uuid" NOT NULL, - "public_key" character varying(66) NOT NULL, - "status" "fee_batch_status" DEFAULT 'draft'::"public"."fee_batch_status" NOT NULL, - "amount" bigint NOT NULL, - "created_at" timestamp with time zone DEFAULT "now"(), - "updated_at" timestamp with time zone DEFAULT "now"(), - "tx_hash" character varying(66) -); - -CREATE TABLE "plugin_policies" ( - "id" "uuid" DEFAULT "gen_random_uuid"() NOT NULL, - "public_key" "text" NOT NULL, - "plugin_id" "plugin_id" NOT NULL, - "plugin_version" "text" NOT NULL, - "policy_version" integer NOT NULL, - "signature" "text" NOT NULL, - "recipe" "text" NOT NULL, - "active" boolean DEFAULT true NOT NULL, - "created_at" timestamp with time zone DEFAULT "now"() NOT NULL, - "updated_at" timestamp with time zone DEFAULT "now"() NOT NULL, - "deleted" boolean DEFAULT false NOT NULL -); - -CREATE TABLE "scheduler" ( - "policy_id" "uuid" NOT NULL, - "next_execution" timestamp without time zone NOT NULL -); - -CREATE TABLE "tx_indexer" ( - "id" "uuid" DEFAULT "gen_random_uuid"() NOT NULL, - "plugin_id" character varying(255) NOT NULL, - "tx_hash" character varying(255), - "chain_id" integer NOT NULL, - "policy_id" "uuid" NOT NULL, - "token_id" character varying(255) NOT NULL, - "from_public_key" character varying(255) NOT NULL, - "to_public_key" character varying(255) NOT NULL, - "proposed_tx_hex" "text" NOT NULL, - "status" "tx_indexer_status" DEFAULT 'PROPOSED'::"public"."tx_indexer_status" NOT NULL, - "status_onchain" "tx_indexer_status_onchain", - "lost" boolean DEFAULT false NOT NULL, - "broadcasted_at" timestamp without time zone, - "created_at" timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, - "updated_at" timestamp without time zone DEFAULT CURRENT_TIMESTAMP NOT NULL -); - -ALTER TABLE ONLY "fee_batch" - ADD CONSTRAINT "fee_batch_pkey" PRIMARY KEY ("id"); - -ALTER TABLE ONLY "plugin_policies" - ADD CONSTRAINT "plugin_policies_pkey" PRIMARY KEY ("id"); - -ALTER TABLE ONLY "scheduler" - ADD CONSTRAINT "scheduler_pkey" PRIMARY KEY ("policy_id"); - -ALTER TABLE ONLY "tx_indexer" - ADD CONSTRAINT "tx_indexer_pkey" PRIMARY KEY ("id"); - -CREATE INDEX "idx_plugin_policies_active" ON "plugin_policies" USING "btree" ("active"); - -CREATE INDEX "idx_plugin_policies_plugin_id" ON "plugin_policies" USING "btree" ("plugin_id"); - -CREATE INDEX "idx_plugin_policies_public_key" ON "plugin_policies" USING "btree" ("public_key"); - -CREATE INDEX "idx_scheduler_next_execution" ON "scheduler" USING "btree" ("next_execution"); - -CREATE INDEX "idx_tx_indexer_key" ON "tx_indexer" USING "btree" ("chain_id", "plugin_id", "policy_id", "token_id", "to_public_key", "created_at"); - -CREATE INDEX "idx_tx_indexer_status_onchain_lost" ON "tx_indexer" USING "btree" ("status_onchain", "lost"); - -CREATE TRIGGER "trg_prevent_insert_if_policy_deleted" BEFORE INSERT ON "plugin_policies" FOR EACH ROW EXECUTE FUNCTION "public"."prevent_insert_if_policy_deleted"(); - -CREATE TRIGGER "trg_prevent_update_if_policy_deleted" BEFORE UPDATE ON "plugin_policies" FOR EACH ROW WHEN (("old"."deleted" = true)) EXECUTE FUNCTION "public"."prevent_update_if_policy_deleted"(); - -CREATE TRIGGER "trg_set_policy_inactive_on_delete" BEFORE INSERT OR UPDATE ON "plugin_policies" FOR EACH ROW WHEN (("new"."deleted" = true)) EXECUTE FUNCTION "public"."set_policy_inactive_on_delete"(); -