Transaction Producer
⚙️ Principle of RocketMQ Transaction Message Model
The transaction message process of RocketMQ is divided into three steps:
1. Send “Half Message” (Prepared Message)
The producer sends a special “half message”, and this message:
- It will not be visible to consumers.
- It indicates that the message has been sent successfully but has not been submitted yet.
2. Execute local transactions
After successfully sending a half-message, the producer executes local transaction logic (such as database operations, etc.).
3. Submit or roll back transaction messages
- Local transaction is executed successfully ⇒ Commit the transaction message (the message becomes deliverable and can be consumed by the consumer).
- Local transaction fails ⇒ Roll back the transaction message (the message is deleted and will never be delivered).
✅ Reliable back-check mechanism (transaction status back-check)
If the Broker has not received a commit or rollback instruction for a long time, it will actively inquire about the local transaction status from the producer.
Prevent semi - messages from hanging unresolved due to the producer crashing or network jitter.
How to send transaction messages
[dependencies]
tokio = "1.45.1"
rocketmq-client-rust = { version = "0.5.0"}
rocketmq-error = { version = "0.5.0" }
rocketmq-common = { version = "0.5.0" }
cheetah-string ={version = "0.1.6"}
parking_lot = "0.12.4"
code:
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use cheetah_string::CheetahString;
use parking_lot::Mutex;
use rocketmq_client_rust::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::producer::transaction_listener::TransactionListener;
use rocketmq_client_rust::producer::transaction_mq_producer::TransactionMQProducer;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_error::RocketMQResult;
pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";
#[tokio::main]
pub async fn main() -> RocketMQResult<()> {
//init logger
rocketmq_common::log::init_logger();
// create a producer builder with default configuration
let builder = TransactionMQProducer::builder();
let mut producer = builder
.producer_group(PRODUCER_GROUP.to_string())
.name_server_addr(DEFAULT_NAMESRVADDR.to_string())
.topics(vec![TOPIC])
.transaction_listener(TransactionListenerImpl::default())
.build();
producer.start().await?;
for _ in 0..10 {
let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
let send_result = producer
.send_message_in_transaction::<()>(message, None)
.await?;
println!("send result: {}", send_result);
}
let _ = tokio::signal::ctrl_c().await;
producer.shutdown().await;
Ok(())
}
struct TransactionListenerImpl {
local_trans: Arc<Mutex<HashMap<CheetahString, i32>>>,
transaction_index: AtomicI32,
}
impl Default for TransactionListenerImpl {
fn default() -> Self {
Self {
local_trans: Arc::new(Default::default()),
transaction_index: Default::default(),
}
}
}
impl TransactionListener for TransactionListenerImpl {
fn execute_local_transaction(
&self,
msg: &Message,
_arg: Option<&(dyn Any + Send + Sync)>,
) -> LocalTransactionState {
let value = self
.transaction_index
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let status = value % 3;
let mut guard = self.local_trans.lock();
guard.insert(
msg.get_transaction_id().cloned().unwrap_or_default(),
status,
);
LocalTransactionState::Unknown
}
fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
let guard = self.local_trans.lock();
let status = guard
.get(&msg.get_transaction_id().cloned().unwrap_or_default())
.unwrap_or(&-1);
match status {
1 => LocalTransactionState::CommitMessage,
2 => LocalTransactionState::RollbackMessage,
_ => LocalTransactionState::Unknown,
}
}
}