ActiveMQ in Action-5.6 Caching messages in the broker for consumers- 高飞网

5.6 Caching messages in the broker for consumers

2016-05-30 22:30:07.0

5.6 在服务端为消费者缓存消息

    即使消息持久化最重要的方面是消息上会在长期存储中存活,然而仍有很多情况下,消息需要对那些从服务端断开连接的消费者还可用,但将消息持久化到数据库中太慢了。交易平台实时传递价格信息数据就是一个很好的案例。但典型的实时数据应用只在有限的时间内使用消息,经常少于1分钟。所以将它们持久化到外部存储系统没有什么意义,因为新的消息很快就到达。

    ActiveMQ支持在服务端对这些类型的使用消息缓存的系统缓存消息,通过使用称为订阅恢复的策略(subscription recovery policy)。这种配置策略为了决定哪种消息应该被缓存,缓存多少以及多长时间而使用。本节剩下的内存我们会解释消息缓存在ActiveMQ如何工作,如何去配置可用的不同类型的订阅恢复机制。


5.6.1 如何为消费者缓存消息

    ActiveMQ消息服务端在内存中为使用的每个topic缓存消息。不支持的topic类型仅有临时(temporary)topic和ActiveMQ公告(advisory)topic。这种方式缓存的消息不能处理queue,因为通常一个queue的操作是将每个消息发送给它。服务端缓存的消息只会被分发到可追溯的消费者,而永久不会转发到持久topic订阅者。

    Topic消费者通过在消费者被创建时设置destination属性被标记为可追溯的。下面是一个示例:

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;

public void createRetroactiveConsumer() throws JMSException {
    ConnectionFactory fac = new ActiveMQConnectionFactory();
    Connection connection = fac.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("TEST.TOPIC?consumer.retroactive=true");
    MessageConsumer consumer = session.createConsumer(topic);
}

    在服务端这边,消息缓存是由被称为subscriptionRecoveryPolicy的目的策略所控制的。服务端使用的默认的订阅恢复策略是FixedSizeSubscriptionRecoveryPolicy,让我看体验一下可用的不同订阅恢复策略。


5.6.2 ActiveMQ订阅恢复策略

    对持久性微调允许有许多不同的策略和为非持久topic消费者缓存的消息类型。每个策略类型在这都会介绍到。

ActiveMQ固定内存大小的订阅恢复策略
    这种策略限制了为topic缓存的消息数量,基于使用的内存大小。这是ActiveMQ默认使用的订阅恢复策略。你可以把它应用于所有的topic或者也可以对某个topic使用。下面5.6表中列出了可用的属性。
表5.6 固定大小的订阅恢复策略的配置属性

属性名
默认值
描述
maximumSize
6553600
用字节表示的缓存的内存大小。
useSharedBuffer
true
若设置为true,申请的内存总量将用在所有的topic上

ActiveMQ固定数量的订阅恢复策略
    该策略限制了topic缓存消息的个数,基于一个静态数值。只有一个可用属性,见表5.7:
表5.7 ActiveMQ固定数量的订阅恢复策略的配置属性

属性名
默认值
描述
maximumSize
100在topic缓存中允许的消息总数

ActiveMQ基于查询的订阅恢复策略
    该策略限制了缓存的消息数量,基于一个应用到每个消息的JMS属性选择器。只有一个可用属性见表5.8
表5.8 基于查询的订阅恢复策略的可用属性

属性名
默认值
描述
query
null
只缓存查询到的消息

ActiveMQ时间限制的订阅恢复策略
    该策略限制了topic缓存消息的数量,通过一个应用到每个消息的过期时间。注意消息上的这个过期时间独立于MessageProducer对象上设置的timeToLive。表5.9列出了该策略的可用配置属性。
表5.9 为定时订阅恢复策略配置的属性

属性名
默认值
描述
recoverDuration
60000
在缓存中保存消息的毫秒时间

ActiveMQ最新镜像订阅恢复策略
    该策略只保持最后发送到topic的消息。这可能对那些实时性价目信息系统很有用——这种系统中每个topic使用的价格,你可能只想使用最后发给topic的价格信息。该策略中没有可配置的属性。


ActiveMQ非订阅恢复策略
该策略禁用对topic的消息缓存,该策略中没有任何属性可用。


5.6.3 配置订阅恢复策略

    在ActiveMQ配置文件中,你可以为特定的topic配置(subscriptionRecoveryPolicy)策略,也可以使用通配符。下面是一个简单的配置示例:
<?xml version="1.0" encoding="UTF-8"?>
<beans>
    <broker brokerName="test-broker" persistent="true"
        useShutdownHook="false" deleteAllMessagesOnStartup="true"
        xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
            <transportConnector uri="tcp://localhost:61635" />
        </transportConnectors>
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic="Topic.FixedSizedSubs.>">
                        <subscriptionRecoveryPolicy>
                            <fixedSizeSubscriptionRecoveryPolicy
                                maximumSize="2000000" useSharedBuffer="false" />
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                    <policyEntry topic="Topic.LastImageSubs.>">
                        <subscriptionRecoveryPolicy>
                            <lastImageSubscriptionRecoveryPolicy />
                        </subscriptionRecoveryPolicy>
                    </policyEntry>

                    <policyEntry topic="Topic.NoSubs.>">
                        <subscriptionRecoveryPolicy>
                            <noSubscriptionRecoveryPolicy />
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                    <policyEntry topic="Topic.TimedSubs.>">
                        <subscriptionRecoveryPolicy>
                            <timedSubscriptionRecoveryPolicy
                                recoverDuration="25000" />
                        </subscriptionRecoveryPolicy>
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>
    </broker>
</beans>





上一篇:5.5 The memory message store
下一篇:5.7 Summary