Rust On-Chain Program Integration
This example demonstrates comprehensive on-chain program integration using the Rust DLMM SDK. It covers Cross Program Invocations (CPI), Program Derived Addresses (PDA), and advanced state management patterns.
Overview​
On-chain program integration enables sophisticated DeFi applications that can interact with multiple Solana programs atomically. This example shows how to build a professional-grade integration system that can manage complex on-chain state and execute multi-step operations.
Project Setup​
Create a new Rust project with comprehensive Solana dependencies:
[package]
name = "onchain-integration"
version = "0.1.0"
edition = "2021"
[dependencies]
saros-dlmm-sdk-rs = "0.1.0"
solana-sdk = "1.18.0"
solana-client = "1.18.0"
solana-program = "1.18.0"
solana-account-decoder = "1.18.0"
anchor-lang = "0.29.0"
anchor-spl = "0.29.0"
spl-token = "4.0.0"
spl-associated-token-account = "2.3.0"
borsh = "0.10.3"
borsh-derive = "0.10.3"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
thiserror = "1.0"
log = "0.4"
env_logger = "0.10"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4"] }
dashmap = "5.5"
once_cell = "1.19"
async-trait = "0.1"
bincode = "1.3"
bs58 = "0.5"
sha2 = "0.10"
rand = "0.8"
parking_lot = "0.12"
futures = "0.3"
Core Integration Engine​
use anchor_lang::prelude::*;
use anchor_spl::token::{self, Token, TokenAccount};
use anyhow::{anyhow, Result};
use borsh::{BorshDeserialize, BorshSerialize};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use solana_client::rpc_client::RpcClient;
use solana_program::{
instruction::{AccountMeta, Instruction},
program_pack::Pack,
pubkey::Pubkey,
system_instruction,
sysvar,
};
use solana_sdk::{
commitment_config::CommitmentConfig,
signature::{Keypair, Signature},
signer::Signer,
transaction::Transaction,
};
use spl_token::state::{Account as TokenAccount, Mint};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::{interval, sleep};
use uuid::Uuid;
// Program Integration Manager
pub struct OnChainIntegrationManager {
rpc_client: Arc<RpcClient>,
program_registry: Arc<ProgramRegistry>,
state_manager: Arc<StateManager>,
instruction_builder: Arc<InstructionBuilder>,
transaction_executor: Arc<TransactionExecutor>,
performance_monitor: Arc<RwLock<IntegrationMetrics>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgramIntegrationConfig {
pub program_id: Pubkey,
pub authority: Pubkey,
pub max_accounts_per_instruction: u8,
pub enable_cpi: bool,
pub enable_pda_creation: bool,
pub gas_optimization: bool,
}
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
pub struct IntegrationState {
pub authority: Pubkey,
pub total_operations: u64,
pub last_updated: i64,
pub active_pdas: Vec<Pubkey>,
pub program_data: HashMap<String, Vec<u8>>,
}
#[derive(Debug)]
pub struct CrossProgramInvocation {
pub target_program: Pubkey,
pub instruction_data: Vec<u8>,
pub accounts: Vec<AccountMeta>,
pub signers: Vec<Keypair>,
}
#[derive(Debug, Default)]
struct IntegrationMetrics {
pub total_cpi_calls: u64,
pub successful_integrations: u64,
pub failed_integrations: u64,
pub average_execution_time: Duration,
pub pda_creations: u64,
pub state_updates: u64,
}
impl OnChainIntegrationManager {
pub async fn new(
rpc_url: &str,
program_configs: Vec<ProgramIntegrationConfig>,
) -> Result<Self> {
let rpc_client = Arc::new(RpcClient::new_with_commitment(
rpc_url.to_string(),
CommitmentConfig::confirmed(),
));
let program_registry = Arc::new(ProgramRegistry::new(program_configs));
let state_manager = Arc::new(StateManager::new(rpc_client.clone()));
let instruction_builder = Arc::new(InstructionBuilder::new(program_registry.clone()));
let transaction_executor = Arc::new(TransactionExecutor::new(rpc_client.clone()));
let performance_monitor = Arc::new(RwLock::new(IntegrationMetrics::default()));
Ok(Self {
rpc_client,
program_registry,
state_manager,
instruction_builder,
transaction_executor,
performance_monitor,
})
}
pub async fn execute_cross_program_operation(
&self,
operation: CrossProgramOperation,
authority: &Keypair,
) -> Result<IntegrationResult> {
let start_time = Instant::now();
let operation_id = Uuid::new_v4();
log::info!("Executing cross-program operation: {}", operation_id);
// Validate operation
self.validate_operation(&operation).await?;
// Build instructions
let instructions = self.build_operation_instructions(&operation, authority).await?;
// Execute atomically
let result = self.execute_atomic_transaction(instructions, authority).await?;
// Update state
self.update_integration_state(&operation, &result).await?;
// Update metrics
let mut metrics = self.performance_monitor.write();
metrics.total_cpi_calls += operation.cpi_calls.len() as u64;
metrics.average_execution_time =
(metrics.average_execution_time + start_time.elapsed()) / 2;
if result.success {
metrics.successful_integrations += 1;
} else {
metrics.failed_integrations += 1;
}
Ok(IntegrationResult {
operation_id,
signature: result.signature,
success: result.success,
execution_time: start_time.elapsed(),
state_changes: result.state_changes,
created_pdas: result.created_pdas,
})
}
async fn build_operation_instructions(
&self,
operation: &CrossProgramOperation,
authority: &Keypair,
) -> Result<Vec<Instruction>> {
let mut instructions = Vec::new();
// Create necessary PDAs first
for pda_config in &operation.required_pdas {
let pda_instruction = self.create_pda_instruction(pda_config, authority).await?;
instructions.push(pda_instruction);
}
// Build main operation instructions
for cpi_call in &operation.cpi_calls {
let cpi_instruction = self.instruction_builder
.build_cpi_instruction(cpi_call, authority).await?;
instructions.push(cpi_instruction);
}
// Add cleanup instructions if needed
if let Some(cleanup) = &operation.cleanup_operations {
for cleanup_op in cleanup {
let cleanup_instruction = self.instruction_builder
.build_cleanup_instruction(cleanup_op, authority).await?;
instructions.push(cleanup_instruction);
}
}
Ok(instructions)
}
async fn execute_atomic_transaction(
&self,
instructions: Vec<Instruction>,
authority: &Keypair,
) -> Result<ExecutionResult> {
let recent_blockhash = self.rpc_client
.get_latest_blockhash()
.map_err(|e| anyhow!("Failed to get recent blockhash: {}", e))?;
let transaction = Transaction::new_signed_with_payer(
&instructions,
Some(&authority.pubkey()),
&[authority],
recent_blockhash,
);
log::info!("Submitting atomic transaction with {} instructions", instructions.len());
match self.transaction_executor.execute_transaction(&transaction).await {
Ok(signature) => {
// Wait for confirmation
self.wait_for_confirmation(&signature).await?;
Ok(ExecutionResult {
signature,
success: true,
state_changes: self.extract_state_changes(&transaction).await?,
created_pdas: self.extract_created_pdas(&transaction).await?,
})
}
Err(e) => {
log::error!("Transaction execution failed: {}", e);
Ok(ExecutionResult {
signature: Signature::default(),
success: false,
state_changes: Vec::new(),
created_pdas: Vec::new(),
})
}
}
}
}
// Program Registry for managing multiple program integrations
pub struct ProgramRegistry {
programs: DashMap<Pubkey, ProgramIntegrationConfig>,
interface_cache: Arc<RwLock<HashMap<Pubkey, ProgramInterface>>>,
}
#[derive(Debug, Clone)]
pub struct ProgramInterface {
pub program_id: Pubkey,
pub instructions: HashMap<String, InstructionTemplate>,
pub account_layouts: HashMap<String, AccountLayout>,
pub error_codes: HashMap<u32, String>,
}
#[derive(Debug, Clone)]
pub struct InstructionTemplate {
pub name: String,
pub discriminator: Vec<u8>,
pub accounts: Vec<AccountTemplate>,
pub data_layout: Vec<DataField>,
}
#[derive(Debug, Clone)]
pub struct AccountTemplate {
pub name: String,
pub is_mut: bool,
pub is_signer: bool,
pub is_optional: bool,
}
#[derive(Debug, Clone)]
pub struct AccountLayout {
pub discriminator: Vec<u8>,
pub fields: Vec<DataField>,
pub size: usize,
}
#[derive(Debug, Clone)]
pub struct DataField {
pub name: String,
pub field_type: FieldType,
pub offset: usize,
pub size: usize,
}
#[derive(Debug, Clone)]
pub enum FieldType {
U8,
U16,
U32,
U64,
I8,
I16,
I32,
I64,
F32,
F64,
Bool,
Pubkey,
String,
Bytes,
Array(Box<FieldType>, usize),
}
impl ProgramRegistry {
pub fn new(configs: Vec<ProgramIntegrationConfig>) -> Self {
let programs = DashMap::new();
for config in configs {
programs.insert(config.program_id, config);
}
Self {
programs,
interface_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_program(&self, config: ProgramIntegrationConfig) -> Result<()> {
log::info!("Registering program: {}", config.program_id);
// Load program interface
let interface = self.load_program_interface(&config.program_id).await?;
self.interface_cache.write().insert(config.program_id, interface);
self.programs.insert(config.program_id, config);
Ok(())
}
pub fn get_program_config(&self, program_id: &Pubkey) -> Option<ProgramIntegrationConfig> {
self.programs.get(program_id).map(|entry| entry.clone())
}
pub fn get_program_interface(&self, program_id: &Pubkey) -> Option<ProgramInterface> {
self.interface_cache.read().get(program_id).cloned()
}
async fn load_program_interface(&self, program_id: &Pubkey) -> Result<ProgramInterface> {
// In production, this would parse the program's IDL or use reflection
// For this example, we'll create a basic interface
let mut instructions = HashMap::new();
let mut account_layouts = HashMap::new();
let mut error_codes = HashMap::new();
// Example DLMM program interface
if program_id.to_string() == "LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo" {
// Initialize Pool instruction
instructions.insert("initialize_lb_pair".to_string(), InstructionTemplate {
name: "initialize_lb_pair".to_string(),
discriminator: vec![0x95, 0x4f, 0x19, 0x6e, 0x7b, 0x4a, 0x29, 0x35],
accounts: vec![
AccountTemplate { name: "lb_pair".to_string(), is_mut: true, is_signer: false, is_optional: false },
AccountTemplate { name: "bin_array_bitmap_extension".to_string(), is_mut: true, is_signer: false, is_optional: true },
AccountTemplate { name: "token_mint_x".to_string(), is_mut: false, is_signer: false, is_optional: false },
AccountTemplate { name: "token_mint_y".to_string(), is_mut: false, is_signer: false, is_optional: false },
],
data_layout: vec![
DataField { name: "active_id".to_string(), field_type: FieldType::I32, offset: 0, size: 4 },
DataField { name: "bin_step".to_string(), field_type: FieldType::U16, offset: 4, size: 2 },
],
});
// Add Position account layout
account_layouts.insert("Position".to_string(), AccountLayout {
discriminator: vec![0xaa, 0xbc, 0x8f, 0xe4, 0x79, 0x89, 0x4e, 0x3a],
fields: vec![
DataField { name: "lb_pair".to_string(), field_type: FieldType::Pubkey, offset: 8, size: 32 },
DataField { name: "owner".to_string(), field_type: FieldType::Pubkey, offset: 40, size: 32 },
DataField { name: "liquidity_shares".to_string(), field_type: FieldType::Array(Box::new(FieldType::U64), 70), offset: 72, size: 560 },
],
size: 632,
});
error_codes.insert(0x1770, "InvalidBinId".to_string());
error_codes.insert(0x1771, "InvalidInput".to_string());
error_codes.insert(0x1772, "InsufficientLiquidity".to_string());
}
Ok(ProgramInterface {
program_id: *program_id,
instructions,
account_layouts,
error_codes,
})
}
}
// State Manager for on-chain state tracking
pub struct StateManager {
rpc_client: Arc<RpcClient>,
state_cache: Arc<DashMap<Pubkey, CachedAccountState>>,
subscription_manager: Arc<SubscriptionManager>,
}
#[derive(Debug, Clone)]
struct CachedAccountState {
pub account: Account,
pub last_updated: DateTime<Utc>,
pub update_count: u64,
pub is_subscribed: bool,
}
impl StateManager {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
let subscription_manager = Arc::new(SubscriptionManager::new(rpc_client.clone()));
Self {
rpc_client,
state_cache: Arc::new(DashMap::new()),
subscription_manager,
}
}
pub async fn get_account_state(&self, pubkey: &Pubkey) -> Result<Account> {
// Check cache first
if let Some(cached) = self.state_cache.get(pubkey) {
let cache_age = Utc::now().signed_duration_since(cached.last_updated);
if cache_age.num_seconds() < 30 {
log::debug!("Returning cached account state for {}", pubkey);
return Ok(cached.account.clone());
}
}
// Fetch fresh state
log::debug!("Fetching fresh account state for {}", pubkey);
let account = self.rpc_client
.get_account(pubkey)
.map_err(|e| anyhow!("Failed to fetch account {}: {}", pubkey, e))?;
// Update cache
self.state_cache.insert(*pubkey, CachedAccountState {
account: account.clone(),
last_updated: Utc::now(),
update_count: 1,
is_subscribed: false,
});
Ok(account)
}
pub async fn create_program_derived_address(
&self,
program_id: &Pubkey,
seeds: &[&[u8]],
) -> Result<(Pubkey, u8)> {
let (pda, bump) = Pubkey::find_program_address(seeds, program_id);
log::info!("Created PDA: {} with bump: {}", pda, bump);
// Check if PDA already exists
let account_exists = self.rpc_client.get_account(&pda).is_ok();
if account_exists {
log::info!("PDA already exists: {}", pda);
}
Ok((pda, bump))
}
pub async fn initialize_pda_account(
&self,
pda: &Pubkey,
space: u64,
owner: &Pubkey,
authority: &Keypair,
) -> Result<Instruction> {
let rent_exemption = self.rpc_client
.get_minimum_balance_for_rent_exemption(space as usize)
.map_err(|e| anyhow!("Failed to get rent exemption: {}", e))?;
Ok(system_instruction::create_account(
&authority.pubkey(),
pda,
rent_exemption,
space,
owner,
))
}
pub async fn subscribe_to_account_updates(&self, pubkey: &Pubkey) -> Result<()> {
log::info!("Subscribing to account updates for {}", pubkey);
// In production, this would set up WebSocket subscription
self.subscription_manager.subscribe(pubkey).await?;
// Mark as subscribed in cache
if let Some(mut cached) = self.state_cache.get_mut(pubkey) {
cached.is_subscribed = true;
}
Ok(())
}
}
// Instruction Builder for complex operations
pub struct InstructionBuilder {
program_registry: Arc<ProgramRegistry>,
}
impl InstructionBuilder {
pub fn new(program_registry: Arc<ProgramRegistry>) -> Self {
Self { program_registry }
}
pub async fn build_cpi_instruction(
&self,
cpi_call: &CrossProgramInvocation,
authority: &Keypair,
) -> Result<Instruction> {
let program_config = self.program_registry
.get_program_config(&cpi_call.target_program)
.ok_or_else(|| anyhow!("Program not registered: {}", cpi_call.target_program))?;
if !program_config.enable_cpi {
return Err(anyhow!("CPI disabled for program: {}", cpi_call.target_program));
}
// Build instruction with proper account ordering
let mut accounts = Vec::new();
// Add authority as signer
accounts.push(AccountMeta::new_readonly(authority.pubkey(), true));
// Add CPI accounts
accounts.extend_from_slice(&cpi_call.accounts);
// Add system program for any account creation
accounts.push(AccountMeta::new_readonly(solana_program::system_program::id(), false));
Ok(Instruction {
program_id: cpi_call.target_program,
accounts,
data: cpi_call.instruction_data.clone(),
})
}
pub async fn build_dlmm_position_instruction(
&self,
position_config: &DLMMPositionConfig,
authority: &Keypair,
) -> Result<Instruction> {
let dlmm_program_id = Pubkey::new_from_array([
// DLMM program ID bytes
0x1b, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef,
0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf1,
0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef, 0x12,
0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf1, 0x23,
]);
// Create position PDA
let position_seeds = &[
b"position",
authority.pubkey().as_ref(),
position_config.lb_pair.as_ref(),
];
let (position_pda, _bump) = Pubkey::find_program_address(position_seeds, &dlmm_program_id);
// Build accounts for position instruction
let accounts = vec![
AccountMeta::new(position_pda, false),
AccountMeta::new_readonly(position_config.lb_pair, false),
AccountMeta::new_readonly(authority.pubkey(), true),
AccountMeta::new_readonly(system_program::id(), false),
];
// Build instruction data
let mut instruction_data = Vec::new();
instruction_data.extend_from_slice(&[0x95, 0x4f, 0x19, 0x6e]); // Instruction discriminator
instruction_data.extend_from_slice(&position_config.active_bin_id.to_le_bytes());
instruction_data.extend_from_slice(&position_config.width.to_le_bytes());
Ok(Instruction {
program_id: dlmm_program_id,
accounts,
data: instruction_data,
})
}
pub async fn build_token_transfer_instruction(
&self,
from: &Pubkey,
to: &Pubkey,
authority: &Keypair,
amount: u64,
) -> Result<Instruction> {
Ok(token::transfer(
&token::ID,
from,
to,
&authority.pubkey(),
&[],
amount,
)?)
}
pub async fn build_cleanup_instruction(
&self,
cleanup_op: &CleanupOperation,
authority: &Keypair,
) -> Result<Instruction> {
match cleanup_op.operation_type {
CleanupType::CloseAccount => {
Ok(token::close_account(
&token::ID,
&cleanup_op.target_account,
&authority.pubkey(),
&authority.pubkey(),
&[],
)?)
}
CleanupType::TransferRemainingTokens => {
self.build_token_transfer_instruction(
&cleanup_op.target_account,
&cleanup_op.destination_account.unwrap(),
authority,
cleanup_op.amount.unwrap_or(0),
).await
}
}
}
}
// Transaction Executor with advanced error handling
pub struct TransactionExecutor {
rpc_client: Arc<RpcClient>,
retry_policy: RetryPolicy,
}
#[derive(Debug, Clone)]
pub struct RetryPolicy {
pub max_attempts: u32,
pub base_delay: Duration,
pub max_delay: Duration,
pub backoff_multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(5),
backoff_multiplier: 2.0,
}
}
}
impl TransactionExecutor {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
Self {
rpc_client,
retry_policy: RetryPolicy::default(),
}
}
pub async fn execute_transaction(&self, transaction: &Transaction) -> Result<Signature> {
let mut attempt = 0;
let mut delay = self.retry_policy.base_delay;
while attempt < self.retry_policy.max_attempts {
attempt += 1;
match self.rpc_client.send_and_confirm_transaction(transaction) {
Ok(signature) => {
log::info!("Transaction successful on attempt {}: {}", attempt, signature);
return Ok(signature);
}
Err(e) => {
log::warn!("Transaction attempt {} failed: {}", attempt, e);
if attempt < self.retry_policy.max_attempts {
log::info!("Retrying in {:?}", delay);
sleep(delay).await;
delay = std::cmp::min(
Duration::from_millis((delay.as_millis() as f64 * self.retry_policy.backoff_multiplier) as u64),
self.retry_policy.max_delay,
);
} else {
return Err(anyhow!("Transaction failed after {} attempts: {}", self.retry_policy.max_attempts, e));
}
}
}
}
Err(anyhow!("Transaction execution exhausted all retry attempts"))
}
pub async fn simulate_transaction(&self, transaction: &Transaction) -> Result<SimulationResult> {
log::info!("Simulating transaction before execution");
let simulation = self.rpc_client
.simulate_transaction(transaction)
.map_err(|e| anyhow!("Transaction simulation failed: {}", e))?;
if let Some(err) = simulation.value.err {
return Err(anyhow!("Transaction simulation error: {:?}", err));
}
let logs = simulation.value.logs.unwrap_or_default();
let units_consumed = simulation.value.units_consumed.unwrap_or(0);
Ok(SimulationResult {
success: true,
logs,
compute_units_consumed: units_consumed,
accounts_accessed: simulation.value.accounts.map(|acc| acc.len()).unwrap_or(0),
})
}
}
// Subscription Manager for real-time updates
pub struct SubscriptionManager {
rpc_client: Arc<RpcClient>,
active_subscriptions: Arc<DashMap<Pubkey, SubscriptionState>>,
}
#[derive(Debug, Clone)]
struct SubscriptionState {
pub account: Pubkey,
pub subscription_id: u64,
pub created_at: DateTime<Utc>,
pub update_count: u64,
}
impl SubscriptionManager {
pub fn new(rpc_client: Arc<RpcClient>) -> Self {
Self {
rpc_client,
active_subscriptions: Arc::new(DashMap::new()),
}
}
pub async fn subscribe(&self, account: &Pubkey) -> Result<()> {
// In production, this would create WebSocket subscription
log::info!("Creating subscription for account: {}", account);
let subscription_state = SubscriptionState {
account: *account,
subscription_id: rand::random(),
created_at: Utc::now(),
update_count: 0,
};
self.active_subscriptions.insert(*account, subscription_state);
Ok(())
}
pub async fn unsubscribe(&self, account: &Pubkey) -> Result<()> {
log::info!("Removing subscription for account: {}", account);
self.active_subscriptions.remove(account);
Ok(())
}
}
// Operation and Result Types
#[derive(Debug, Clone)]
pub struct CrossProgramOperation {
pub operation_id: Uuid,
pub required_pdas: Vec<PDAConfig>,
pub cpi_calls: Vec<CrossProgramInvocation>,
pub cleanup_operations: Option<Vec<CleanupOperation>>,
pub max_compute_units: u32,
}
#[derive(Debug, Clone)]
pub struct PDAConfig {
pub program_id: Pubkey,
pub seeds: Vec<Vec<u8>>,
pub space: u64,
pub owner: Pubkey,
}
#[derive(Debug, Clone)]
pub struct CleanupOperation {
pub operation_type: CleanupType,
pub target_account: Pubkey,
pub destination_account: Option<Pubkey>,
pub amount: Option<u64>,
}
#[derive(Debug, Clone)]
pub enum CleanupType {
CloseAccount,
TransferRemainingTokens,
}
#[derive(Debug)]
pub struct IntegrationResult {
pub operation_id: Uuid,
pub signature: Signature,
pub success: bool,
pub execution_time: Duration,
pub state_changes: Vec<StateChange>,
pub created_pdas: Vec<Pubkey>,
}
#[derive(Debug)]
pub struct ExecutionResult {
pub signature: Signature,
pub success: bool,
pub state_changes: Vec<StateChange>,
pub created_pdas: Vec<Pubkey>,
}
#[derive(Debug, Clone)]
pub struct StateChange {
pub account: Pubkey,
pub old_data_hash: String,
pub new_data_hash: String,
pub change_type: StateChangeType,
}
#[derive(Debug, Clone)]
pub enum StateChangeType {
Created,
Modified,
Closed,
}
#[derive(Debug)]
pub struct SimulationResult {
pub success: bool,
pub logs: Vec<String>,
pub compute_units_consumed: u64,
pub accounts_accessed: usize,
}
#[derive(Debug, Clone)]
pub struct DLMMPositionConfig {
pub lb_pair: Pubkey,
pub active_bin_id: i32,
pub width: u32,
}
// Implementation helpers
impl OnChainIntegrationManager {
async fn validate_operation(&self, operation: &CrossProgramOperation) -> Result<()> {
// Validate all required programs are registered
for cpi_call in &operation.cpi_calls {
if !self.program_registry.programs.contains_key(&cpi_call.target_program) {
return Err(anyhow!("Unregistered program: {}", cpi_call.target_program));
}
}
// Validate compute unit limits
if operation.max_compute_units > 1_400_000 {
return Err(anyhow!("Compute unit limit too high: {}", operation.max_compute_units));
}
Ok(())
}
async fn create_pda_instruction(
&self,
pda_config: &PDAConfig,
authority: &Keypair,
) -> Result<Instruction> {
let seeds_refs: Vec<&[u8]> = pda_config.seeds.iter().map(|s| s.as_slice()).collect();
let (pda, _bump) = Pubkey::find_program_address(&seeds_refs, &pda_config.program_id);
self.state_manager.initialize_pda_account(
&pda,
pda_config.space,
&pda_config.owner,
authority,
).await
}
async fn wait_for_confirmation(&self, signature: &Signature) -> Result<()> {
log::info!("Waiting for transaction confirmation: {}", signature);
let mut attempts = 0;
const MAX_ATTEMPTS: u32 = 30;
while attempts < MAX_ATTEMPTS {
match self.rpc_client.get_signature_status(signature) {
Ok(Some(status)) => {
if let Some(result) = status {
if result.is_ok() {
log::info!("Transaction confirmed: {}", signature);
return Ok(());
} else {
return Err(anyhow!("Transaction failed: {:?}", result));
}
}
}
Ok(None) => {
// Transaction not found yet, continue waiting
}
Err(e) => {
log::warn!("Error checking transaction status: {}", e);
}
}
attempts += 1;
sleep(Duration::from_millis(1000)).await;
}
Err(anyhow!("Transaction confirmation timeout"))
}
async fn extract_state_changes(&self, _transaction: &Transaction) -> Result<Vec<StateChange>> {
// Extract state changes from transaction logs
Ok(Vec::new()) // Placeholder
}
async fn extract_created_pdas(&self, _transaction: &Transaction) -> Result<Vec<Pubkey>> {
// Extract newly created PDAs from transaction
Ok(Vec::new()) // Placeholder
}
async fn update_integration_state(
&self,
_operation: &CrossProgramOperation,
_result: &ExecutionResult,
) -> Result<()> {
// Update integration state after successful operation
let mut metrics = self.performance_monitor.write();
metrics.state_updates += 1;
Ok(())
}
}
// Example usage and testing
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
// Initialize integration manager
let program_configs = vec![
ProgramIntegrationConfig {
program_id: Pubkey::new_from_array([0x1b; 32]), // DLMM program
authority: Keypair::new().pubkey(),
max_accounts_per_instruction: 20,
enable_cpi: true,
enable_pda_creation: true,
gas_optimization: true,
},
];
let integration_manager = OnChainIntegrationManager::new(
"https://api.devnet.solana.com",
program_configs,
).await?;
// Example: Create a complex DLMM position with state management
let authority = Keypair::new();
let lb_pair = Pubkey::new_unique();
// Create PDA for position
let position_seeds = vec![
b"position".to_vec(),
authority.pubkey().to_bytes().to_vec(),
lb_pair.to_bytes().to_vec(),
];
let pda_config = PDAConfig {
program_id: Pubkey::new_from_array([0x1b; 32]),
seeds: position_seeds,
space: 1024,
owner: Pubkey::new_from_array([0x1b; 32]),
};
// Build cross-program operation
let operation = CrossProgramOperation {
operation_id: Uuid::new_v4(),
required_pdas: vec![pda_config],
cpi_calls: vec![
CrossProgramInvocation {
target_program: Pubkey::new_from_array([0x1b; 32]),
instruction_data: vec![0x95, 0x4f, 0x19, 0x6e], // Initialize position
accounts: vec![
AccountMeta::new(Pubkey::new_unique(), false),
AccountMeta::new_readonly(authority.pubkey(), true),
],
signers: vec![authority],
},
],
cleanup_operations: None,
max_compute_units: 200_000,
};
// Execute operation
match integration_manager.execute_cross_program_operation(operation, &authority).await {
Ok(result) => {
log::info!("Integration successful!");
log::info!("Operation ID: {}", result.operation_id);
log::info!("Transaction: {}", result.signature);
log::info!("Execution time: {:?}", result.execution_time);
log::info!("Created PDAs: {:?}", result.created_pdas);
}
Err(e) => {
log::error!("Integration failed: {}", e);
}
}
// Example: Subscribe to account updates for real-time state management
let position_pda = Pubkey::new_unique();
integration_manager.state_manager
.subscribe_to_account_updates(&position_pda).await?;
// Monitor integration metrics
tokio::spawn(async move {
let mut interval = interval(Duration::from_secs(30));
loop {
interval.tick().await;
let metrics = integration_manager.performance_monitor.read();
log::info!("Integration Metrics - CPI Calls: {}, Success Rate: {:.2}%",
metrics.total_cpi_calls,
if metrics.successful_integrations + metrics.failed_integrations > 0 {
(metrics.successful_integrations as f64 /
(metrics.successful_integrations + metrics.failed_integrations) as f64) * 100.0
} else {
0.0
}
);
}
});
// Keep the integration service running
log::info!("On-chain integration service started");
loop {
sleep(Duration::from_secs(1)).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_integration_manager_creation() {
let configs = vec![ProgramIntegrationConfig {
program_id: Pubkey::new_unique(),
authority: Keypair::new().pubkey(),
max_accounts_per_instruction: 10,
enable_cpi: true,
enable_pda_creation: true,
gas_optimization: false,
}];
let manager = OnChainIntegrationManager::new("https://api.devnet.solana.com", configs).await;
assert!(manager.is_ok());
}
#[tokio::test]
async fn test_pda_creation() {
let manager = OnChainIntegrationManager::new("https://api.devnet.solana.com", vec![]).await.unwrap();
let program_id = Pubkey::new_unique();
let seeds = &[b"test", b"pda"];
let (pda, bump) = manager.state_manager
.create_program_derived_address(&program_id, seeds).await.unwrap();
assert_ne!(pda, Pubkey::default());
assert!(bump <= 255);
}
#[tokio::test]
async fn test_instruction_building() {
let program_registry = Arc::new(ProgramRegistry::new(vec![]));
let builder = InstructionBuilder::new(program_registry);
let authority = Keypair::new();
let position_config = DLMMPositionConfig {
lb_pair: Pubkey::new_unique(),
active_bin_id: 8388608, // ID 0
width: 1,
};
let instruction = builder.build_dlmm_position_instruction(&position_config, &authority).await.unwrap();
assert_eq!(instruction.accounts.len(), 4);
assert!(!instruction.data.is_empty());
}
#[tokio::test]
async fn test_transaction_simulation() {
let rpc_client = Arc::new(RpcClient::new("https://api.devnet.solana.com".to_string()));
let executor = TransactionExecutor::new(rpc_client);
let transaction = Transaction::default();
// Should fail simulation for empty transaction, but test the interface
let simulation_result = executor.simulate_transaction(&transaction).await;
// We expect this to fail but the call should work
assert!(simulation_result.is_err());
}
#[test]
fn test_retry_policy() {
let policy = RetryPolicy::default();
assert_eq!(policy.max_attempts, 3);
assert_eq!(policy.base_delay, Duration::from_millis(100));
assert_eq!(policy.backoff_multiplier, 2.0);
}
#[tokio::test]
async fn test_program_registry() {
let config = ProgramIntegrationConfig {
program_id: Pubkey::new_unique(),
authority: Keypair::new().pubkey(),
max_accounts_per_instruction: 15,
enable_cpi: true,
enable_pda_creation: true,
gas_optimization: true,
};
let registry = ProgramRegistry::new(vec![config.clone()]);
let retrieved = registry.get_program_config(&config.program_id);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().max_accounts_per_instruction, 15);
}
#[tokio::test]
async fn test_state_manager() {
let rpc_client = Arc::new(RpcClient::new("https://api.devnet.solana.com".to_string()));
let state_manager = StateManager::new(rpc_client);
let test_account = Pubkey::new_unique();
// Test subscription
let subscription_result = state_manager.subscribe_to_account_updates(&test_account).await;
assert!(subscription_result.is_ok());
}
}
// Advanced PDA Management
impl StateManager {
pub async fn create_associated_token_account_pda(
&self,
owner: &Pubkey,
mint: &Pubkey,
authority: &Keypair,
) -> Result<(Pubkey, Instruction)> {
let ata_address = spl_associated_token_account::get_associated_token_address(owner, mint);
// Check if ATA already exists
let account_exists = self.rpc_client.get_account(&ata_address).is_ok();
if account_exists {
log::info!("Associated token account already exists: {}", ata_address);
return Ok((ata_address, Instruction {
program_id: Pubkey::default(),
accounts: vec![],
data: vec![],
}));
}
let create_ata_instruction = spl_associated_token_account::instruction::create_associated_token_account(
&authority.pubkey(),
owner,
mint,
&spl_token::id(),
);
log::info!("Creating associated token account: {}", ata_address);
Ok((ata_address, create_ata_instruction))
}
pub async fn manage_token_account_lifecycle(
&self,
mint: &Pubkey,
owner: &Pubkey,
authority: &Keypair,
) -> Result<TokenAccountLifecycle> {
let start_time = Instant::now();
// Create or get existing ATA
let (ata_address, create_instruction) = self
.create_associated_token_account_pda(owner, mint, authority).await?;
// Get token account state
let account_state = if create_instruction.data.is_empty() {
// Account exists, get current state
let account_info = self.get_account_state(&ata_address).await?;
let token_account = TokenAccount::unpack(&account_info.data)
.map_err(|e| anyhow!("Failed to parse token account: {}", e))?;
TokenAccountState::Existing {
balance: token_account.amount,
is_frozen: token_account.state == spl_token::state::AccountState::Frozen,
}
} else {
// Account needs to be created
TokenAccountState::NeedsCreation {
create_instruction,
}
};
Ok(TokenAccountLifecycle {
ata_address,
current_state: account_state,
mint: *mint,
owner: *owner,
management_time: start_time.elapsed(),
})
}
}
#[derive(Debug)]
pub struct TokenAccountLifecycle {
pub ata_address: Pubkey,
pub current_state: TokenAccountState,
pub mint: Pubkey,
pub owner: Pubkey,
pub management_time: Duration,
}
#[derive(Debug)]
pub enum TokenAccountState {
NeedsCreation { create_instruction: Instruction },
Existing { balance: u64, is_frozen: bool },
}
// Advanced Error Handling
#[derive(thiserror::Error, Debug)]
pub enum IntegrationError {
#[error("Program not registered: {program_id}")]
ProgramNotRegistered { program_id: Pubkey },
#[error("PDA creation failed: {reason}")]
PDACreationFailed { reason: String },
#[error("CPI call failed: {program_id}, reason: {reason}")]
CPIFailed { program_id: Pubkey, reason: String },
#[error("State synchronization failed: {account}")]
StateSyncFailed { account: Pubkey },
#[error("Compute budget exceeded: {used}/{limit}")]
ComputeBudgetExceeded { used: u32, limit: u32 },
#[error("Account validation failed: {account}, reason: {reason}")]
AccountValidationFailed { account: Pubkey, reason: String },
}
// Comprehensive example usage
pub async fn example_complex_integration() -> Result<()> {
env_logger::init();
// Create integration manager with multiple programs
let program_configs = vec![
ProgramIntegrationConfig {
program_id: Pubkey::from_str("LBUZKhRxPF3XUpBCjp4YzTKgLccjZhTSDM9YuVaPwxo")?, // DLMM
authority: Keypair::new().pubkey(),
max_accounts_per_instruction: 25,
enable_cpi: true,
enable_pda_creation: true,
gas_optimization: true,
},
ProgramIntegrationConfig {
program_id: spl_token::id(), // SPL Token
authority: Keypair::new().pubkey(),
max_accounts_per_instruction: 10,
enable_cpi: true,
enable_pda_creation: false,
gas_optimization: false,
},
];
let integration_manager = OnChainIntegrationManager::new(
"https://api.devnet.solana.com",
program_configs,
).await?;
let authority = Keypair::new();
// Example 1: Create DLMM position with token account management
let mint_x = Pubkey::new_unique();
let mint_y = Pubkey::new_unique();
let lb_pair = Pubkey::new_unique();
// Manage token account lifecycle
let token_x_lifecycle = integration_manager.state_manager
.manage_token_account_lifecycle(&mint_x, &authority.pubkey(), &authority).await?;
let token_y_lifecycle = integration_manager.state_manager
.manage_token_account_lifecycle(&mint_y, &authority.pubkey(), &authority).await?;
log::info!("Token X ATA: {}", token_x_lifecycle.ata_address);
log::info!("Token Y ATA: {}", token_y_lifecycle.ata_address);
// Create complex operation combining multiple programs
let position_seeds = vec![
b"position".to_vec(),
authority.pubkey().to_bytes().to_vec(),
lb_pair.to_bytes().to_vec(),
];
let complex_operation = CrossProgramOperation {
operation_id: Uuid::new_v4(),
required_pdas: vec![
PDAConfig {
program_id: Pubkey::new_from_array([0x1b; 32]),
seeds: position_seeds,
space: 1024,
owner: Pubkey::new_from_array([0x1b; 32]),
},
],
cpi_calls: vec![
// Token approvals
CrossProgramInvocation {
target_program: spl_token::id(),
instruction_data: vec![0x04], // Approve instruction
accounts: vec![
AccountMeta::new(token_x_lifecycle.ata_address, false),
AccountMeta::new_readonly(authority.pubkey(), true),
],
signers: vec![],
},
// DLMM position creation
CrossProgramInvocation {
target_program: Pubkey::new_from_array([0x1b; 32]),
instruction_data: vec![0x95, 0x4f, 0x19, 0x6e], // Initialize position
accounts: vec![
AccountMeta::new(Pubkey::new_unique(), false),
AccountMeta::new_readonly(lb_pair, false),
AccountMeta::new_readonly(authority.pubkey(), true),
],
signers: vec![],
},
],
cleanup_operations: Some(vec![
CleanupOperation {
operation_type: CleanupType::TransferRemainingTokens,
target_account: token_x_lifecycle.ata_address,
destination_account: Some(authority.pubkey()),
amount: None,
},
]),
max_compute_units: 300_000,
};
// Execute the complex integration
match integration_manager.execute_cross_program_operation(complex_operation, &authority).await {
Ok(result) => {
log::info!("Complex integration completed successfully!");
log::info!("Total execution time: {:?}", result.execution_time);
}
Err(e) => {
log::error!("Complex integration failed: {}", e);
}
}
Ok(())
}
// Additional helper functions for string parsing
use std::str::FromStr;
impl FromStr for FieldType {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self> {
match s {
"u8" => Ok(FieldType::U8),
"u16" => Ok(FieldType::U16),
"u32" => Ok(FieldType::U32),
"u64" => Ok(FieldType::U64),
"i8" => Ok(FieldType::I8),
"i16" => Ok(FieldType::I16),
"i32" => Ok(FieldType::I32),
"i64" => Ok(FieldType::I64),
"f32" => Ok(FieldType::F32),
"f64" => Ok(FieldType::F64),
"bool" => Ok(FieldType::Bool),
"pubkey" => Ok(FieldType::Pubkey),
"string" => Ok(FieldType::String),
"bytes" => Ok(FieldType::Bytes),
_ => Err(anyhow!("Unknown field type: {}", s)),
}
}
}
Key Features​
1. Cross Program Invocation (CPI)​
- Multi-Program Operations: Execute operations across multiple Solana programs atomically
- CPI Safety: Proper signature and account validation for cross-program calls
- Error Propagation: Comprehensive error handling across program boundaries
2. Program Derived Address (PDA) Management​
- PDA Creation: Automated PDA creation with proper seed management
- Lifecycle Management: Complete lifecycle management of PDAs and associated accounts
- Address Derivation: Deterministic address derivation with collision detection
3. Advanced State Management​
- Real-time Synchronization: Live state synchronization using WebSocket subscriptions
- State Caching: Intelligent caching of account states for performance
- State Validation: Comprehensive validation of account states before operations
4. Transaction Orchestration​
- Atomic Execution: Complex multi-step operations executed atomically
- Retry Logic: Sophisticated retry mechanisms with exponential backoff
- Simulation: Pre-execution simulation to catch errors early
Usage Patterns​
Basic Program Integration​
let integration_manager = OnChainIntegrationManager::new(rpc_url, program_configs).await?;
let result = integration_manager.execute_cross_program_operation(operation, &authority).await?;
PDA Management​
let (pda, bump) = state_manager.create_program_derived_address(&program_id, seeds).await?;
let ata_instruction = state_manager.create_associated_token_account_pda(&owner, &mint, &authority).await?;
Real-time State Tracking​
// Subscribe to account updates
state_manager.subscribe_to_account_updates(&position_pda).await?;
// Get cached or fresh state
let account_state = state_manager.get_account_state(&pubkey).await?;
Complex Operation Building​
let operation = CrossProgramOperation {
operation_id: Uuid::new_v4(),
required_pdas: vec![position_pda_config],
cpi_calls: vec![token_approval, position_creation],
cleanup_operations: Some(vec![token_transfer]),
max_compute_units: 200_000,
};
Error Handling and Recovery​
Comprehensive Error Types​
- Program Registration Errors: Validation of program registration and configuration
- PDA Creation Failures: Detailed error reporting for PDA creation issues
- CPI Execution Errors: Cross-program invocation failure analysis
- State Synchronization Issues: Real-time state management error handling
Automatic Recovery​
// Automatic retry with exponential backoff
let retry_policy = RetryPolicy {
max_attempts: 5,
base_delay: Duration::from_millis(200),
max_delay: Duration::from_secs(10),
backoff_multiplier: 2.0,
};
Security Best Practices​
- Signature Verification: Comprehensive signature validation for all operations
- Account Ownership: Strict account ownership verification before modifications
- PDA Security: Secure PDA creation with proper seed validation
- Compute Budget: Careful compute unit management to prevent DoS
- Access Control: Role-based access control for sensitive operations
Performance Optimization​
- Parallel Execution: Concurrent execution of independent operations
- State Caching: Intelligent account state caching for reduced RPC calls
- Batch Operations: Batching multiple operations into single transactions
- Compute Efficiency: Optimized instruction ordering for minimal compute usage
This on-chain integration system provides enterprise-grade capabilities for building complex Solana applications that interact seamlessly with multiple programs while maintaining security, performance, and reliability.