Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/api-app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ app:
url: http://opex-market
opex-bc-gateway:
url: http://opex-bc-gateway
storage:
url: http://storage
auth:
cert-url: http://keycloak:8080/realms/opex/protocol/openid-connect/certs
iss-url: ${TOKEN_ISSUER_URL:http://keycloak:8080/realms/opex}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package co.nilin.opex.api.core.spi

import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.FilePart

interface StorageProxy {
suspend fun adminDownload(token: String, bucket: String, key: String): ResponseEntity<ByteArray>
suspend fun adminUpload(token: String, bucket: String, key: String, file: FilePart,isPublic : Boolean? = false)
suspend fun adminDelete(token: String, bucket: String, key: String)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package co.nilin.opex.api.ports.opex.controller

import co.nilin.opex.api.core.spi.StorageProxy
import co.nilin.opex.api.ports.opex.util.jwtAuthentication
import co.nilin.opex.api.ports.opex.util.tokenValue
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.FilePart
import org.springframework.security.core.annotation.CurrentSecurityContext
import org.springframework.security.core.context.SecurityContext
import org.springframework.web.bind.annotation.*

@RestController
@RequestMapping("/opex/v1/admin/storage")
class StorageAdminController(
private val storageProxy: StorageProxy,
) {
@GetMapping
suspend fun download(
@CurrentSecurityContext securityContext: SecurityContext,
@RequestParam("bucket") bucket: String,
@RequestParam("key") key: String,
): ResponseEntity<ByteArray> {
return storageProxy.adminDownload(securityContext.jwtAuthentication().tokenValue(), bucket, key)
}

@PostMapping
suspend fun upload(
@CurrentSecurityContext securityContext: SecurityContext,
@RequestParam("bucket") bucket: String,
@RequestParam("key") key: String,
@RequestPart("file") file: FilePart,
@RequestParam("isPublic") isPublic: Boolean? = false,
) {
storageProxy.adminUpload(securityContext.jwtAuthentication().tokenValue(), bucket, key, file, isPublic)
}

@DeleteMapping
suspend fun delete(
@CurrentSecurityContext securityContext: SecurityContext,
@RequestParam("bucket") bucket: String,
@RequestParam("key") key: String,
) {
storageProxy.adminDelete(securityContext.jwtAuthentication().tokenValue(), bucket, key)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package co.nilin.opex.api.ports.proxy.impl

import co.nilin.opex.api.core.spi.StorageProxy
import co.nilin.opex.common.utils.LoggerDelegate
import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpHeaders
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.FilePart
import org.springframework.stereotype.Component
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.reactive.function.BodyInserters
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodilessEntity
import reactor.core.publisher.Mono

@Component
class StorageProxyImpl(@Qualifier("generalWebClient") private val webClient: WebClient) : StorageProxy {

private val logger by LoggerDelegate()

@Value("\${app.storage.url}")
private lateinit var baseUrl: String

override suspend fun adminDownload(
token: String,
bucket: String,
key: String
): ResponseEntity<ByteArray> {
return webClient.get()
.uri("$baseUrl/v2/admin") {
it.queryParam("bucket", bucket)
it.queryParam("key", key)
it.build()
}
.header(HttpHeaders.AUTHORIZATION, "Bearer $token")
.accept(
MediaType.APPLICATION_OCTET_STREAM,
MediaType.APPLICATION_JSON
)
.exchangeToMono { response ->
if (response.statusCode().isError) {
response.createException().flatMap { Mono.error(it) }
} else {
response.toEntity(ByteArray::class.java)
}
}
.awaitSingle()
}

override suspend fun adminUpload(
token: String,
bucket: String,
key: String,
file: FilePart,
isPublic : Boolean?
) {
webClient.post()
.uri("$baseUrl/v2/admin"){
it.queryParam("isPublic", isPublic)
it.queryParam("bucket", bucket)
it.queryParam("key", key)
it.build()
}
.header(HttpHeaders.AUTHORIZATION, "Bearer $token")
.contentType(MediaType.MULTIPART_FORM_DATA)
.body(
BodyInserters.fromMultipartData(
LinkedMultiValueMap<String, Any>().apply {
add("file", file)
}
))
.retrieve()
.onStatus({ it.isError }) { it.createException() }
.awaitBodilessEntity()
}

override suspend fun adminDelete(
token: String,
bucket: String,
key: String
) {
webClient.delete()
.uri("$baseUrl/v2/admin") {
it.queryParam("bucket", bucket)
it.queryParam("key", key)
it.build()
}
.accept(MediaType.APPLICATION_JSON)
.header(HttpHeaders.AUTHORIZATION, "Bearer $token")
.retrieve()
.onStatus({ it.isError }) { it.createException() }
.awaitBodilessEntity()
}
}
4 changes: 3 additions & 1 deletion docker-compose-otc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ services:
- VAULT_HOST=vault
- SWAGGER_AUTH_URL=$KEYCLOAK_FRONTEND_URL
- DRIVE_FOLDER_ID=$DRIVE_FOLDER_ID
- BACKUP_ENABLED=$WALLET_BACKUP_ENABLED
- STORAGE_FOLDER_ID=$STORAGE_FOLDER_ID
- BACKUP_ENABLED_STORAGE=$WALLET_BACKUP_ENABLED_STORAGE
- BACKUP_ENABLED_GOOGLE_DRIVE=$WALLET_BACKUP_ENABLED_GOOGLE_DRIVE
- SPRING_PROFILES_ACTIVE=otc
- AUTH_URL=${AUTH_URL}
- AUTH_JWK_ENDPOINT=${JWK_ENDPOINT}
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,9 @@ services:
- VAULT_HOST=vault
- SWAGGER_AUTH_URL=$KEYCLOAK_FRONTEND_URL
- DRIVE_FOLDER_ID=$DRIVE_FOLDER_ID
- BACKUP_ENABLED=$WALLET_BACKUP_ENABLED
- STORAGE_FOLDER_ID=$STORAGE_FOLDER_ID
- BACKUP_ENABLED_STORAGE=$WALLET_BACKUP_ENABLED_STORAGE
- BACKUP_ENABLED_GOOGLE_DRIVE=$WALLET_BACKUP_ENABLED_GOOGLE_DRIVE
- SYMBOLS=BTC_USDT,ETH_USDT,BTC_IRT,ETH_IRT,USDT_IRT,ETH_BUSD,BTC_BUSD,BNB_BUSD
- WITHDRAW_LIMIT_ENABLED=${WITHDRAW_LIMIT_ENABLED}
- WITHDRAW_OTP_REQUIRED_COUNT=${WITHDRAW_OTP_REQUIRED_COUNT}
Expand Down
6 changes: 6 additions & 0 deletions wallet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
<module>wallet-ports/wallet-eventlistener-kafka</module>
<module>wallet-ports/wallet-bcgateway-proxy</module>
<module>wallet-ports/wallet-profile-proxy</module>
<module>wallet-ports/wallet-storage-proxy</module>
<module>wallet-ports/wallet-auth-proxy</module>
<module>wallet-ports/wallet-accountant-proxy</module>
<module>wallet-ports/wallet-otp-proxy</module>
Expand Down Expand Up @@ -60,6 +61,11 @@
<artifactId>wallet-profile-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>co.nilin.opex.wallet.ports.storage</groupId>
<artifactId>wallet-storage-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>co.nilin.opex.wallet.ports.auth</groupId>
<artifactId>wallet-auth-proxy</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions wallet/wallet-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@
<groupId>co.nilin.opex.wallet.ports.profile</groupId>
<artifactId>wallet-profile-proxy</artifactId>
</dependency>
<dependency>
<groupId>co.nilin.opex.wallet.ports.storage</groupId>
<artifactId>wallet-storage-proxy</artifactId>
</dependency>
<dependency>
<groupId>co.nilin.opex.wallet.ports.auth</groupId>
<artifactId>wallet-auth-proxy</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package co.nilin.opex.wallet.app.service

import co.nilin.opex.wallet.core.model.BriefWallet
import co.nilin.opex.wallet.core.model.WalletOwner
import co.nilin.opex.wallet.core.spi.StorageProxy
import co.nilin.opex.wallet.core.spi.WalletManager
import co.nilin.opex.wallet.core.spi.WalletOwnerManager
import com.fasterxml.jackson.databind.ObjectMapper
Expand All @@ -12,88 +13,109 @@ import com.google.api.services.drive.Drive
import com.google.api.services.drive.DriveScopes
import com.google.auth.http.HttpCredentialsAdapter
import com.google.auth.oauth2.GoogleCredentials
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Profile
import org.springframework.http.MediaType
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.io.File
import java.nio.file.Files
import java.time.LocalDateTime
import java.util.concurrent.Executors
import java.time.Instant
import com.google.api.services.drive.model.File as GoogleFile

@Service
@Profile("!test")
class BackupService(
private val walletManager: WalletManager,
private val walletOwnerManager: WalletOwnerManager,
@Value("\${app.backup.enabled}")
private val isBackupEnabled: Boolean,
@Value("\${app.backup.drive.folder}")
private val folderId: String
private val storageProxy: StorageProxy,
@Value("\${app.backup.storage.enabled}") private val storageEnabled: Boolean,
@Value("\${app.backup.google-drive.enabled}") private val driveEnabled: Boolean,
@Value("\${app.backup.storage.folder}") private val storageFolderId: String,
@Value("\${app.backup.google-drive.folder}") private val driveFolderId: String
) {

private val logger = LoggerFactory.getLogger(BackupService::class.java)
private val dispatcher = Executors.newFixedThreadPool(4).asCoroutineDispatcher()
private val mapper = ObjectMapper()

data class OwnerAndWallets(val owner: WalletOwner, val wallets: List<BriefWallet>)

@Scheduled(initialDelay = 60000, fixedDelay = 1000 * 60 * 10)
private fun backup() {
if (!isBackupEnabled) return
runBlocking(dispatcher) {
try {
val data = arrayListOf<OwnerAndWallets>()

logger.info("Fetching wallets for backup...")
walletOwnerManager.findAllWalletOwners().forEach {
data.add(OwnerAndWallets(it, walletManager.findAllWalletsBriefNotZero(it.id!!)))
}

logger.info("Writing wallets to temp file")
val file = writeWalletsToFile(data)
upload(file, file.name, folderId)
} catch (e: Exception) {
logger.error("Could not upload file to google drive", e)
}
}
}
private val logger = LoggerFactory.getLogger(javaClass)
private val mapper = ObjectMapper().findAndRegisterModules()

private fun writeWalletsToFile(wallets: List<OwnerAndWallets>): File {
val fileName = LocalDateTime.now().toString()
val tempFile = Files.createTempFile(fileName, ".json").toFile()
mapper.writeValue(tempFile, wallets)
return tempFile
}
data class OwnerAndWallets(
val owner: WalletOwner,
val wallets: List<BriefWallet>
)

@Scheduled(initialDelay = 60_000, fixedDelay = 10 * 60 * 1000)
fun backup() = runBlocking {
if (!storageEnabled && !driveEnabled) return@runBlocking

val fileName = "wallets-${Instant.now().toEpochMilli()}.json"
logger.info("Starting wallet backup: $fileName")

private fun upload(file: File, fileName: String, folderId: String) {
try {
logger.info("Start upload process with folderId:$folderId")
val authFile = File("/drive-key.json")
val credentials = GoogleCredentials.fromStream(authFile.inputStream()).createScoped(DriveScopes.DRIVE)
val requestInitializer = HttpCredentialsAdapter(credentials)

val service = Drive.Builder(NetHttpTransport(), GsonFactory.getDefaultInstance(), requestInitializer)
.setApplicationName("Wallet backup")
.build()

val metadata = GoogleFile().apply {
name = fileName
parents = listOf(folderId)
val tempFile = writeBackupToTempFile()

if (storageEnabled) {
storageProxy.systemUploadFile(
bucket = storageFolderId,
key = fileName,
file = tempFile,
)
}
val content = FileContent("text/*", file)

val uploaded = service.files().create(metadata, content)
.setFields("id,name")
.execute()
logger.info("File uploaded: ${uploaded.id}--${uploaded.name}")
if (driveEnabled) {
uploadToGoogleDrive(tempFile, fileName)
}

tempFile.delete()
} catch (e: Exception) {
logger.error("Wallet backup is enabled but could not upload to Google Drive", e)
logger.error("Wallet backup failed", e)
}
}

}
private suspend fun writeBackupToTempFile(): File =
withContext(Dispatchers.IO) {
val file = Files.createTempFile("wallets-", ".json").toFile()
val data = walletOwnerManager.findAllWalletOwners().map { owner ->
OwnerAndWallets(
owner,
walletManager.findAllWalletsBriefNotZero(owner.id!!)
)
}
mapper.writeValue(file, data)
file
}

private fun uploadToGoogleDrive(file: File, fileName: String) {
logger.info("Uploading backup to Google Drive")

val credentials = GoogleCredentials
.fromStream(File("/drive-key.json").inputStream())
.createScoped(DriveScopes.DRIVE)

val drive = Drive.Builder(
NetHttpTransport(),
GsonFactory.getDefaultInstance(),
HttpCredentialsAdapter(credentials)
)
.setApplicationName("Wallet backup")
.build()

val metadata = GoogleFile().apply {
name = fileName
parents = listOf(driveFolderId)
}

drive.files().create(
metadata,
FileContent(MediaType.APPLICATION_JSON_VALUE, file)
)
.setFields("id,name")
.execute()
.also {
logger.info("Uploaded to Drive: ${it.id}")
}
}
}
Loading
Loading