Architecture

⚙️ RocketMQ 事务消息模型原理

RocketMQ 的事务消息流程分为 三步

1. 发送“半消息”(Prepared Message)

生产者发送一个特殊的“半消息”,这个消息:

  • 不会对消费者可见。
  • 表示消息发送成功,但还未提交。

2. 执行本地事务

在发送半消息成功后,生产者执行本地事务逻辑(如数据库操作等)。

3. 提交 or 回滚事务消息

  • 本地事务执行成功 ⇒ 提交事务消息(消息变为可投递状态,消费者可消费)。
  • 本地事务失败 ⇒ 回滚事务消息(消息被删除,永远不会投递)。

✅ 可靠回查机制(事务状态回查)

如果 Broker 长时间未收到提交或回滚指令,会 主动询问生产者 本地事务状态。

防止因生产者宕机或网络抖动,导致半消息悬挂不决。

快速开始

[dependencies]
tokio = "1.45.1"
rocketmq-client-rust = { version = "0.5.0"}
rocketmq-error = { version = "0.5.0" }
rocketmq-common = { version = "0.5.0" }
cheetah-string ={version = "0.1.6"}
parking_lot = "0.12.4"

代码:

use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;

use cheetah_string::CheetahString;
use parking_lot::Mutex;
use rocketmq_client_rust::producer::local_transaction_state::LocalTransactionState;
use rocketmq_client_rust::producer::mq_producer::MQProducer;
use rocketmq_client_rust::producer::transaction_listener::TransactionListener;
use rocketmq_client_rust::producer::transaction_mq_producer::TransactionMQProducer;
use rocketmq_common::common::message::MessageTrait;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::common::message::message_single::Message;
use rocketmq_error::RocketMQResult;

pub const MESSAGE_COUNT: usize = 1;
pub const PRODUCER_GROUP: &str = "please_rename_unique_group_name";
pub const DEFAULT_NAMESRVADDR: &str = "127.0.0.1:9876";
pub const TOPIC: &str = "TopicTest";
pub const TAG: &str = "TagA";

#[tokio::main]
pub async fn main() -> RocketMQResult<()> {
    //init logger
    rocketmq_common::log::init_logger();

    // create a producer builder with default configuration
    let builder = TransactionMQProducer::builder();

    let mut producer = builder
        .producer_group(PRODUCER_GROUP.to_string())
        .name_server_addr(DEFAULT_NAMESRVADDR.to_string())
        .topics(vec![TOPIC])
        .transaction_listener(TransactionListenerImpl::default())
        .build();

    producer.start().await?;

    for _ in 0..10 {
        let message = Message::with_tags(TOPIC, TAG, "Hello RocketMQ".as_bytes());
        let send_result = producer
            .send_message_in_transaction::<()>(message, None)
            .await?;
        println!("send result: {}", send_result);
    }
    let _ = tokio::signal::ctrl_c().await;
    producer.shutdown().await;

    Ok(())
}

struct TransactionListenerImpl {
    local_trans: Arc<Mutex<HashMap<CheetahString, i32>>>,
    transaction_index: AtomicI32,
}

impl Default for TransactionListenerImpl {
    fn default() -> Self {
        Self {
            local_trans: Arc::new(Default::default()),
            transaction_index: Default::default(),
        }
    }
}

impl TransactionListener for TransactionListenerImpl {
    fn execute_local_transaction(
        &self,
        msg: &Message,
        _arg: Option<&(dyn Any + Send + Sync)>,
    ) -> LocalTransactionState {
        let value = self
            .transaction_index
            .fetch_add(1, std::sync::atomic::Ordering::AcqRel);
        let status = value % 3;
        let mut guard = self.local_trans.lock();
        guard.insert(
            msg.get_transaction_id().cloned().unwrap_or_default(),
            status,
        );
        LocalTransactionState::Unknown
    }

    fn check_local_transaction(&self, msg: &MessageExt) -> LocalTransactionState {
        let guard = self.local_trans.lock();
        let status = guard
            .get(&msg.get_transaction_id().cloned().unwrap_or_default())
            .unwrap_or(&-1);
        match status {
            1 => LocalTransactionState::CommitMessage,
            2 => LocalTransactionState::RollbackMessage,
            _ => LocalTransactionState::Unknown,
        }
    }
}

Updated: