impl 3 task

master
plazmoid 5 days ago
parent b57e1ce731
commit 98cb8daa41
  1. 7788
      grpc-3/Cargo.lock
  2. 16
      grpc-3/Cargo.toml
  3. 12
      grpc-3/config.yaml
  4. 247
      grpc-3/src/main.rs
  5. 2
      mass-transfer-2/Cargo.toml

7788
grpc-3/Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -0,0 +1,16 @@
[package]
name = "grpc-3"
version = "0.1.0"
edition = "2024"
[dependencies]
bs58 = "0.5.1"
config = "0.15.11"
futures = "0.3.31"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
solana-client = "2.2.7"
solana-sdk = "2.2.2"
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
yellowstone-grpc-client = "6.1.0"
yellowstone-grpc-proto = "6.1.0"

@ -0,0 +1,12 @@
transfers:
- sender:
pubkey: 6VKVAHy2GcQ8EexKgp4RPRQMEW5j12k7bjHFB8acPe8e
privkey: GiLKdLTczFTD5ZQKmcWJHmMz2LYyFVHRznezoHmsaD9p
receiver: DyoR8dJyoZ983TpHBEH6uDzBsqgEAEkFjvK19nKYDtKi
- sender:
pubkey: HP9QKWx6dhCFQm2XX3CbGdyutL3MUeDrozcQNuf7J8rc
privkey: DA3bffxJouzoxwRxS6dkrLRMC4i3W86zNoA6kPfgSYTq
receiver: AFUr4kCcxx9BFUqJ3qtbNZNzWiU99PTC8mKr5spdkJHN
solana_node_url: https://api.devnet.solana.com
solana_grpc_url: https://grpc.ny.shyft.to
solana_grpc_token: b2b972c6-fff2-4b5c-aac9-375c6984b80e

@ -0,0 +1,247 @@
use std::{collections::HashMap, error::Error, str::FromStr, sync::Arc};
use config::Config;
use futures::{SinkExt, StreamExt, future::join_all};
use serde::{Deserialize, de};
use solana_client::{client_error::ClientError, nonblocking::rpc_client::RpcClient};
use solana_sdk::{
commitment_config::CommitmentConfig,
native_token::LAMPORTS_PER_SOL,
pubkey::Pubkey,
signature::{Keypair, Signature},
signer::Signer,
system_instruction,
transaction::Transaction,
};
use yellowstone_grpc_client::{ClientTlsConfig, GeyserGrpcClient};
use yellowstone_grpc_proto::geyser::{
CommitmentLevel, SubscribeRequest, SubscribeRequestFilterBlocksMeta,
subscribe_update::UpdateOneof,
};
#[derive(Deserialize)]
struct Cfg {
transfers: Vec<CfgTransfer>,
solana_node_url: String,
solana_grpc_url: String,
solana_grpc_token: String,
}
#[derive(Deserialize)]
struct CfgTransfer {
#[serde(deserialize_with = "deser_keypair")]
sender: Keypair,
#[serde(deserialize_with = "deser_pubkey")]
receiver: Pubkey,
}
fn deser_keypair<'de, D>(deserializer: D) -> Result<Keypair, D::Error>
where
D: de::Deserializer<'de>,
{
#[derive(Deserialize)]
struct CfgKeyPair {
pubkey: String,
privkey: String,
}
let pair: CfgKeyPair = de::Deserialize::deserialize(deserializer)?;
let privkey = bs58::decode(pair.privkey)
.into_vec()
.map_err(de::Error::custom)?;
let pubkey = bs58::decode(pair.pubkey)
.into_vec()
.map_err(de::Error::custom)?;
let mut bytes = [0u8; 64];
bytes[..32].copy_from_slice(&privkey);
bytes[32..].copy_from_slice(&pubkey);
Keypair::from_bytes(&bytes).map_err(de::Error::custom)
}
pub fn deser_pubkey<'de, D>(deserializer: D) -> Result<Pubkey, D::Error>
where
D: de::Deserializer<'de>,
{
let s: String = de::Deserialize::deserialize(deserializer)?;
Pubkey::from_str(&s).map_err(de::Error::custom)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let config = {
let config_parser = Config::builder()
.add_source(config::File::with_name("config.yaml"))
.build()?;
config_parser.try_deserialize::<Cfg>()?
};
let client = Arc::new(RpcClient::new_with_commitment(
config.solana_node_url.clone(),
CommitmentConfig::confirmed(),
));
let mut grpc_client = GeyserGrpcClient::build_from_shared(config.solana_grpc_url)?
.x_token(Some(config.solana_grpc_token))
.unwrap()
.tls_config(ClientTlsConfig::new().with_native_roots())?
.connect()
.await?;
let accounts_from = config
.transfers
.iter()
.map(|t| t.sender.pubkey())
.collect::<Vec<_>>();
let accounts_to = config
.transfers
.iter()
.map(|s| s.receiver)
.collect::<Vec<_>>();
//**// create account
// create_account(client.clone(), &config.transfers[0].sender, &Keypair::new()).await?;
// return Ok(());
//**//
let all_accounts = accounts_from.iter().chain(&accounts_to);
print_balances(client.clone(), all_accounts.clone()).await?;
let (mut subscribe_tx, mut stream) = grpc_client.subscribe().await?;
subscribe_tx
.send(SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::from([(String::new(), SubscribeRequestFilterBlocksMeta {})]),
entry: HashMap::new(),
commitment: Some(CommitmentLevel::Confirmed as i32),
accounts_data_slice: vec![],
ping: None,
from_slot: None,
})
.await?;
while let Some(msg) = stream.next().await {
let msg = msg?;
match msg.update_oneof {
Some(UpdateOneof::BlockMeta(bm)) => {
println!(
"Received new block at {}, starting transaction",
bm.block_height.unwrap().block_height
);
print_balances(client.clone(), all_accounts.clone()).await?;
perform_transaction(client.clone(), &config.transfers).await?;
print_balances(client.clone(), all_accounts.clone()).await?;
}
o => println!("skipped {o:?}"),
}
}
Ok(())
}
async fn wait_for_confirmation(
client: Arc<RpcClient>,
signatures: &[Signature],
) -> Result<(), ClientError> {
for sgn in signatures {
'conf: loop {
let confirmed = client.confirm_transaction(sgn).await?;
if confirmed {
break 'conf;
}
}
}
Ok(())
}
async fn perform_transaction(
client: Arc<RpcClient>,
transfers: &[CfgTransfer],
) -> Result<(), ClientError> {
let transfer_amount = LAMPORTS_PER_SOL / 100;
let transactions = run_batch(transfers.iter().map(|tr| {
let instruction =
system_instruction::transfer(&tr.sender.pubkey(), &tr.receiver, transfer_amount);
let mut transaction =
Transaction::new_with_payer(&[instruction], Some(&tr.sender.pubkey()));
let client = client.clone();
println!(
"sending {} SOL from {} to {}",
transfer_amount as f64 / LAMPORTS_PER_SOL as f64,
tr.sender.pubkey(),
tr.receiver
);
async move {
let blockhash = client.get_latest_blockhash().await?;
transaction.sign(&[&tr.sender], blockhash);
client.send_transaction(&transaction).await
}
}))
.await?;
println!("Pending transactions: \n{transactions:#?}");
println!("Waiting for confirmation...");
wait_for_confirmation(client.clone(), &transactions).await?;
Ok(())
}
async fn create_account(
client: Arc<RpcClient>,
payer: &Keypair,
new_account: &Keypair,
) -> Result<(), ClientError> {
let data_len = 0;
let rent_exemption_amount = client
.get_minimum_balance_for_rent_exemption(data_len)
.await?;
println!("pubkey: {}", new_account.pubkey());
println!(
"privkey: {}",
bs58::encode(new_account.secret().as_bytes()).into_string()
);
let instruction = system_instruction::create_account(
&payer.pubkey(),
&new_account.pubkey(),
rent_exemption_amount,
data_len as u64,
&Pubkey::from_str("11111111111111111111111111111111").unwrap(),
);
let mut transaction = Transaction::new_with_payer(&[instruction], Some(&payer.pubkey()));
transaction.sign(
&[&payer, &new_account],
client.get_latest_blockhash().await?,
);
client.send_and_confirm_transaction(&transaction).await?;
println!("account {} created", new_account.pubkey());
Ok(())
}
async fn run_batch<T>(
tasks: impl IntoIterator<Item = impl Future<Output = Result<T, ClientError>>>,
) -> Result<Vec<T>, ClientError> {
join_all(tasks).await.into_iter().collect()
}
async fn print_balances(
client: Arc<RpcClient>,
accounts: impl IntoIterator<Item = &Pubkey> + Clone,
) -> Result<(), ClientError> {
let balances = run_batch(accounts.clone().into_iter().map(|acc| {
let client = client.clone();
async move {
let balance = client.get_balance(acc).await?;
Ok(balance as f64 / LAMPORTS_PER_SOL as f64)
}
}))
.await?;
for (acc, bal) in accounts.into_iter().zip(balances) {
println!("{acc}: {bal} SOL");
}
Ok(())
}

@ -11,4 +11,4 @@ serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
solana-client = "2.2.7"
solana-sdk = "2.2.2"
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.45.0", features = ["macros", "rt-multi-thread"]}

Loading…
Cancel
Save