From a17feaf33781a10f715c4d300ff35b3c0d8ac334 Mon Sep 17 00:00:00 2001 From: Ahmed Mohamed Date: Tue, 20 May 2025 04:34:57 +1000 Subject: [PATCH 1/2] Add providers for consolidation and deposit requests. --- CHANGELOG.md | 3 + main.go | 2 +- services/chaindb/filters.go | 64 +++++++ services/chaindb/mock/service.go | 40 +++++ .../postgresql/consolidationrequests.go | 163 ++++++++++++++++++ .../chaindb/postgresql/depositrequests.go | 158 +++++++++++++++++ services/chaindb/service.go | 12 ++ 7 files changed, 441 insertions(+), 1 deletion(-) create mode 100644 services/chaindb/postgresql/consolidationrequests.go create mode 100644 services/chaindb/postgresql/depositrequests.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 06cb4e7..04b59b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +0.9.2: + - add provider functions for consolidation and deposit requests + 0.9.1: - add commit hash to startup - warn if blobs are no longer available in the beacon node for a block, but continue diff --git a/main.go b/main.go index 8460c45..3d086ef 100644 --- a/main.go +++ b/main.go @@ -64,7 +64,7 @@ import ( ) // ReleaseVersion is the release version for the code. -var ReleaseVersion = "0.9.1" +var ReleaseVersion = "0.9.2" func main() { os.Exit(main2()) diff --git a/services/chaindb/filters.go b/services/chaindb/filters.go index cc615f8..dcee4c5 100644 --- a/services/chaindb/filters.go +++ b/services/chaindb/filters.go @@ -367,3 +367,67 @@ type WithdrawalFilter struct { // If nil then no filter is applied. Canonical *bool } + +// ConsolidationRequestFilter defines a filter for fetching consolidation requests. +// Filter elements are ANDed together. +// Results are always returned in ascending (slot,index) order. +type ConsolidationRequestFilter struct { + // Limit is the maximum number of items to return. + Limit uint32 + + // Order is either OrderEarliest, in which case the earliest results + // that match the filter are returned, or OrderLatest, in which case the + // latest results that match the filter are returned. + // The default is OrderEarliest. + Order Order + + // From is the earliest slot from which to fetch items. + // If nil then there is no earliest slot. + From *phase0.Slot + + // To is the latest slot to which to fetch items. + // If nil then there is no latest slot. + To *phase0.Slot + + // SourcePubkeys is the list of source validator public keys for which to obtain items. + // If nil then no filter is applied. + SourcePubkeys []phase0.BLSPubKey + + // TargetPubkeys is the list of target validator public keys for which to obtain items. + // If nil then no filter is applied. + TargetPubkeys []phase0.BLSPubKey + + // BlockRoots is the list of block roots for which to obtain items. + // If nil then no filter is applied. + BlockRoots []phase0.Root +} + +// DepositRequestFilter defines a filter for fetching deposit requests. +// Filter elements are ANDed together. +// Results are always returned in ascending (slot,index) order. +type DepositRequestFilter struct { + // Limit is the maximum number of items to return. + Limit uint32 + + // Order is either OrderEarliest, in which case the earliest results + // that match the filter are returned, or OrderLatest, in which case the + // latest results that match the filter are returned. + // The default is OrderEarliest. + Order Order + + // From is the earliest slot from which to fetch items. + // If nil then there is no earliest slot. + From *phase0.Slot + + // To is the latest slot to which to fetch items. + // If nil then there is no latest slot. + To *phase0.Slot + + // ValidatorPubkeys is the list of validator public keys for which to obtain items. + // If nil then no filter is applied. + ValidatorPubkeys []phase0.BLSPubKey + + // BlockRoots is the list of block roots for which to obtain items. + // If nil then no filter is applied. + BlockRoots []phase0.Root +} diff --git a/services/chaindb/mock/service.go b/services/chaindb/mock/service.go index 7f1b3b2..972cfcc 100644 --- a/services/chaindb/mock/service.go +++ b/services/chaindb/mock/service.go @@ -596,6 +596,46 @@ func (s *service) Withdrawals(_ context.Context, _ *chaindb.WithdrawalFilter) ([ return []*chaindb.Withdrawal{}, nil } +// ConsolidationRequests provides consolidation requests according to the filter. +func (s *service) ConsolidationRequests(_ context.Context, _ *chaindb.ConsolidationRequestFilter) ([]*chaindb.ConsolidationRequest, error) { + return []*chaindb.ConsolidationRequest{}, nil +} + +// SetConsolidationRequests sets multiple consolidation requests. +func (s *service) SetConsolidationRequests(_ context.Context, _ []*chaindb.ConsolidationRequest) error { + return nil +} + +// SetConsolidationRequest sets a consolidation request. +func (s *service) SetConsolidationRequest(_ context.Context, _ *chaindb.ConsolidationRequest) error { + return nil +} + +// DepositRequests provides deposit requests according to the filter. +func (s *service) DepositRequests(_ context.Context, _ *chaindb.DepositRequestFilter) ([]*chaindb.DepositRequest, error) { + return []*chaindb.DepositRequest{}, nil +} + +// SetDepositRequests sets multiple deposit requests. +func (s *service) SetDepositRequests(_ context.Context, _ []*chaindb.DepositRequest) error { + return nil +} + +// SetDepositRequest sets a deposit request. +func (s *service) SetDepositRequest(_ context.Context, _ *chaindb.DepositRequest) error { + return nil +} + +// SetWithdrawalRequests sets multiple withdrawal requests. +func (s *service) SetWithdrawalRequests(_ context.Context, _ []*chaindb.WithdrawalRequest) error { + return nil +} + +// SetWithdrawalRequest sets a withdrawal request. +func (s *service) SetWithdrawalRequest(_ context.Context, _ *chaindb.WithdrawalRequest) error { + return nil +} + // BeginTx begins a transaction. func (s *service) BeginTx(_ context.Context) (context.Context, context.CancelFunc, error) { return nil, nil, nil diff --git a/services/chaindb/postgresql/consolidationrequests.go b/services/chaindb/postgresql/consolidationrequests.go new file mode 100644 index 0000000..9160ee3 --- /dev/null +++ b/services/chaindb/postgresql/consolidationrequests.go @@ -0,0 +1,163 @@ +// Copyright © 2025 Weald Technology Trading. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgresql + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/pkg/errors" + "github.com/wealdtech/chaind/services/chaindb" + "go.opentelemetry.io/otel" +) + +// ConsolidationRequests provides consolidation requests according to the filter. +func (s *Service) ConsolidationRequests(ctx context.Context, filter *chaindb.ConsolidationRequestFilter) ([]*chaindb.ConsolidationRequest, error) { + ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "ConsolidationRequests") + defer span.End() + + tx := s.tx(ctx) + if tx == nil { + ctx, err := s.BeginROTx(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to begin transaction") + } + defer s.CommitROTx(ctx) + tx = s.tx(ctx) + } + + // Build the query. + queryBuilder := strings.Builder{} + queryVals := make([]interface{}, 0) + + queryBuilder.WriteString(` +SELECT f_block_root + ,f_slot + ,f_index + ,f_source_address + ,f_source_pubkey + ,f_target_pubkey +FROM t_block_consolidation_requests`) + + conditions := make([]string, 0) + + if filter.From != nil { + queryVals = append(queryVals, *filter.From) + conditions = append(conditions, fmt.Sprintf("f_slot >= $%d", len(queryVals))) + } + + if filter.To != nil { + queryVals = append(queryVals, *filter.To) + conditions = append(conditions, fmt.Sprintf("f_slot <= $%d", len(queryVals))) + } + + if len(filter.SourcePubkeys) > 0 { + sourcePubkeysBytes := make([][]byte, len(filter.SourcePubkeys)) + for i, pubkey := range filter.SourcePubkeys { + sourcePubkeysBytes[i] = pubkey[:] + } + queryVals = append(queryVals, sourcePubkeysBytes) + conditions = append(conditions, fmt.Sprintf("f_source_pubkey = ANY($%d)", len(queryVals))) + } + + if len(filter.TargetPubkeys) > 0 { + targetPubkeysBytes := make([][]byte, len(filter.TargetPubkeys)) + for i, pubkey := range filter.TargetPubkeys { + targetPubkeysBytes[i] = pubkey[:] + } + queryVals = append(queryVals, targetPubkeysBytes) + conditions = append(conditions, fmt.Sprintf("f_target_pubkey = ANY($%d)", len(queryVals))) + } + + if len(filter.BlockRoots) > 0 { + queryVals = append(queryVals, filter.BlockRoots) + queryBuilder.WriteString(fmt.Sprintf("f_block_root = ANY($%d)", len(queryVals))) + } + + if len(conditions) > 0 { + queryBuilder.WriteString("\nWHERE ") + queryBuilder.WriteString(strings.Join(conditions, " AND ")) + } + + switch filter.Order { + case chaindb.OrderEarliest: + queryBuilder.WriteString(` +ORDER BY f_slot, f_index`) + case chaindb.OrderLatest: + queryBuilder.WriteString(` +ORDER BY f_slot DESC, f_index DESC`) + default: + return nil, errors.New("no order specified") + } + + if filter.Limit > 0 { + queryVals = append(queryVals, filter.Limit) + queryBuilder.WriteString(fmt.Sprintf(` +LIMIT $%d`, len(queryVals))) + } + + if e := log.Trace(); e.Enabled() { + params := make([]string, len(queryVals)) + for i := range queryVals { + params[i] = fmt.Sprintf("%v", queryVals[i]) + } + e.Str("query", strings.ReplaceAll(queryBuilder.String(), "\n", " ")).Strs("params", params).Msg("SQL query") + } + + rows, err := tx.Query(ctx, + queryBuilder.String(), + queryVals..., + ) + if err != nil { + return nil, err + } + defer rows.Close() + + requests := make([]*chaindb.ConsolidationRequest, 0) + for rows.Next() { + request := &chaindb.ConsolidationRequest{} + var blockRoot []byte + var sourceAddress []byte + var sourcePubkey []byte + var targetPubkey []byte + err := rows.Scan( + &blockRoot, + &request.InclusionSlot, + &request.InclusionIndex, + &sourceAddress, + &sourcePubkey, + &targetPubkey, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to scan row") + } + copy(request.InclusionBlockRoot[:], blockRoot) + copy(request.SourceAddress[:], sourceAddress) + copy(request.SourcePubkey[:], sourcePubkey) + copy(request.TargetPubkey[:], targetPubkey) + requests = append(requests, request) + } + + // Always return order of slot then index. + sort.Slice(requests, func(i int, j int) bool { + if requests[i].InclusionSlot != requests[j].InclusionSlot { + return requests[i].InclusionSlot < requests[j].InclusionSlot + } + return requests[i].InclusionIndex < requests[j].InclusionIndex + }) + + return requests, nil +} diff --git a/services/chaindb/postgresql/depositrequests.go b/services/chaindb/postgresql/depositrequests.go new file mode 100644 index 0000000..56d019a --- /dev/null +++ b/services/chaindb/postgresql/depositrequests.go @@ -0,0 +1,158 @@ +// Copyright © 2025 Weald Technology Trading. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgresql + +import ( + "context" + "fmt" + "sort" + "strings" + + "github.com/pkg/errors" + "github.com/wealdtech/chaind/services/chaindb" + "go.opentelemetry.io/otel" +) + +// DepositRequests provides deposit requests according to the filter. +func (s *Service) DepositRequests(ctx context.Context, filter *chaindb.DepositRequestFilter) ([]*chaindb.DepositRequest, error) { + ctx, span := otel.Tracer("wealdtech.chaind.services.chaindb.postgresql").Start(ctx, "DepositRequests") + defer span.End() + + tx := s.tx(ctx) + if tx == nil { + ctx, err := s.BeginROTx(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to begin transaction") + } + defer s.CommitROTx(ctx) + tx = s.tx(ctx) + } + + // Build the query. + queryBuilder := strings.Builder{} + queryVals := make([]interface{}, 0) + + queryBuilder.WriteString(` +SELECT f_block_root + ,f_slot + ,f_index + ,f_pubkey + ,f_withdrawal_credentials + ,f_amount + ,f_signature + ,f_deposit_index +FROM t_block_deposit_requests`) + + conditions := make([]string, 0) + + if filter.From != nil { + queryVals = append(queryVals, *filter.From) + conditions = append(conditions, fmt.Sprintf("f_slot >= $%d", len(queryVals))) + } + + if filter.To != nil { + queryVals = append(queryVals, *filter.To) + conditions = append(conditions, fmt.Sprintf("f_slot <= $%d", len(queryVals))) + } + + if len(filter.ValidatorPubkeys) > 0 { + validatorPubkeysBytes := make([][]byte, len(filter.ValidatorPubkeys)) + for i, pubkey := range filter.ValidatorPubkeys { + validatorPubkeysBytes[i] = pubkey[:] + } + queryVals = append(queryVals, validatorPubkeysBytes) + conditions = append(conditions, fmt.Sprintf("f_pubkey = ANY($%d)", len(queryVals))) + } + + if len(filter.BlockRoots) > 0 { + queryVals = append(queryVals, filter.BlockRoots) + queryBuilder.WriteString(fmt.Sprintf("f_block_root = ANY($%d)", len(queryVals))) + } + + if len(conditions) > 0 { + queryBuilder.WriteString("\nWHERE ") + queryBuilder.WriteString(strings.Join(conditions, " AND ")) + } + + switch filter.Order { + case chaindb.OrderEarliest: + queryBuilder.WriteString(` +ORDER BY f_slot, f_index`) + case chaindb.OrderLatest: + queryBuilder.WriteString(` +ORDER BY f_slot DESC, f_index DESC`) + default: + return nil, errors.New("no order specified") + } + + if filter.Limit > 0 { + queryVals = append(queryVals, filter.Limit) + queryBuilder.WriteString(fmt.Sprintf(` +LIMIT $%d`, len(queryVals))) + } + + if e := log.Trace(); e.Enabled() { + params := make([]string, len(queryVals)) + for i := range queryVals { + params[i] = fmt.Sprintf("%v", queryVals[i]) + } + e.Str("query", strings.ReplaceAll(queryBuilder.String(), "\n", " ")).Strs("params", params).Msg("SQL query") + } + + rows, err := tx.Query(ctx, + queryBuilder.String(), + queryVals..., + ) + if err != nil { + return nil, err + } + defer rows.Close() + + requests := make([]*chaindb.DepositRequest, 0) + for rows.Next() { + request := &chaindb.DepositRequest{} + var blockRoot []byte + var pubkey []byte + var withdrawalCredentials []byte + var signature []byte + err := rows.Scan( + &blockRoot, + &request.InclusionSlot, + &request.InclusionIndex, + &pubkey, + &withdrawalCredentials, + &request.Amount, + &signature, + &request.Index, + ) + if err != nil { + return nil, errors.Wrap(err, "failed to scan row") + } + copy(request.InclusionBlockRoot[:], blockRoot) + copy(request.Pubkey[:], pubkey) + copy(request.WithdrawalCredentials[:], withdrawalCredentials) + copy(request.Signature[:], signature) + requests = append(requests, request) + } + + // Always return order of slot then index. + sort.Slice(requests, func(i int, j int) bool { + if requests[i].InclusionSlot != requests[j].InclusionSlot { + return requests[i].InclusionSlot < requests[j].InclusionSlot + } + return requests[i].InclusionIndex < requests[j].InclusionIndex + }) + + return requests, nil +} diff --git a/services/chaindb/service.go b/services/chaindb/service.go index f2dd67c..b013438 100644 --- a/services/chaindb/service.go +++ b/services/chaindb/service.go @@ -165,6 +165,12 @@ type ChainSpecSetter interface { SetChainSpecValue(ctx context.Context, key string, value any) error } +// ConsolidationRequestsProvider defines functions to access consolidation requests. +type ConsolidationRequestsProvider interface { + // ConsolidationRequests provides consolidation requests according to the filter. + ConsolidationRequests(ctx context.Context, filter *ConsolidationRequestFilter) ([]*ConsolidationRequest, error) +} + // ConsolidationRequestsSetter defines functions to create and update consolidation requests. type ConsolidationRequestsSetter interface { // SetConsolidationRequests sets consolidation requests. @@ -174,6 +180,12 @@ type ConsolidationRequestsSetter interface { SetConsolidationRequest(ctx context.Context, request *ConsolidationRequest) error } +// DepositRequestsProvider defines functions to access deposit requests. +type DepositRequestsProvider interface { + // DepositRequests provides deposit requests according to the filter. + DepositRequests(ctx context.Context, filter *DepositRequestFilter) ([]*DepositRequest, error) +} + // DepositRequestsSetter defines functions to create and update deposit requests. type DepositRequestsSetter interface { // SetDepositRequests sets deposit requests. From 83a667d1c1b191654a5f7a3cb7f509084b3726dc Mon Sep 17 00:00:00 2001 From: Ahmed Mohamed Date: Tue, 20 May 2025 16:22:19 +1000 Subject: [PATCH 2/2] Use any instead of interface --- services/chaindb/postgresql/consolidationrequests.go | 2 +- services/chaindb/postgresql/depositrequests.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/services/chaindb/postgresql/consolidationrequests.go b/services/chaindb/postgresql/consolidationrequests.go index 9160ee3..e8642b1 100644 --- a/services/chaindb/postgresql/consolidationrequests.go +++ b/services/chaindb/postgresql/consolidationrequests.go @@ -41,7 +41,7 @@ func (s *Service) ConsolidationRequests(ctx context.Context, filter *chaindb.Con // Build the query. queryBuilder := strings.Builder{} - queryVals := make([]interface{}, 0) + queryVals := make([]any, 0) queryBuilder.WriteString(` SELECT f_block_root diff --git a/services/chaindb/postgresql/depositrequests.go b/services/chaindb/postgresql/depositrequests.go index 56d019a..cea7b17 100644 --- a/services/chaindb/postgresql/depositrequests.go +++ b/services/chaindb/postgresql/depositrequests.go @@ -41,7 +41,7 @@ func (s *Service) DepositRequests(ctx context.Context, filter *chaindb.DepositRe // Build the query. queryBuilder := strings.Builder{} - queryVals := make([]interface{}, 0) + queryVals := make([]any, 0) queryBuilder.WriteString(` SELECT f_block_root