ActiveMQ in Action-5.2 The KahaDB message store- 高飞网

5.2 The KahaDB message store

2016-05-19 15:52:23.0

5.2 KahaDB消息存储

    自从ActiveMQ5.3以后,推荐使用的通用消息存储就是Kaha DB。这是一种基于文件的消息存储,包含事务型的日志,支持消息的存储与恢复,具有良好的性能和可扩展性。

    KahaDB是一个基于文件的,事务型存储,可以微调,并设计为快速的消息存储。KahaDB的目标是易用,且尽可能地快。使用文件数据库,就意味着没有第三方数据库这个先决条件。这个存储使得ActiveMQ从下载到运行只需几分钟。另外,KahaDB的文件目录为了消息服务端的需要,设计的非常精简。

    KahaDB消息存储使用事务日志存储索引,并只用一个索引文件应对所有的目标消息。它已经用于10000个连接的生产环境当中,每个连接都有自己的queue。可配置的KahaDB,意味着可以微调以用于更多的场景,从高吞吐的应用(如交易平台),到存储海量消息(如GPS定位)。

    注:译者在windows环境未做任何配置的情况下,只能连接到1000个连接,再多就会报异常:

Exception in thread "main" javax.jms.JMSException: Cannot send, channel has already failed: tcp://127.0.0.1:61616
	at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
	at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1407)
	at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1496)
	at org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:523)
	at org.apache.activemq.book.ch4_my.Publisher.wConn1Msg(Publisher.java:30)
	at org.apache.activemq.book.ch4_my.Publisher.main(Publisher.java:15)
Caused by: org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://127.0.0.1:61616
	at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:327)
	at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:316)
	at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85)
	at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116)
	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
	at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
	at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
	at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1382)
	... 4 more

    启动ActiveMQKahaDB,你只需要配置activemq.xml中的<persistenceAdapter>节点。下面是最小的配置:

<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>
...
</broker>

    如果你想将ActiveMQ内置到你的应用中,消息存储也可以使用编程式配置。下面是示例:

public class EmbeddedBrokerUsingAMQStoreExample {
    BrokerService createEmbeddedBroker() throws Exception {
        BrokerService broker = new BrokerService();
        File dataFileDir = new File("target/amq-in-action/kahadb");
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        // Using a bigger journal file
        kaha.setJournalMaxFileLength(1024 * 100);
        // small batch means more frequent and smaller writes
        kaha.setIndexWriteBatchSize(100);
        // do the index write in a separate thread
        kaha.setEnableIndexWriteAsync(true);
        broker.setPersistenceAdapter(kaha);
        // create a transport connector
        broker.addConnector("tcp://localhost:61616");
        // start the broker
        broker.start();
        return broker;
    }
}

    示例虽然很小,但也足以创建一个使用KahaDB存储的ActiveMQ服务端了,并且监听着通过TCP连接到ActiveMQ的客户端。要了解更多的关于嵌入式ActiveMQ,可查看第8章内容。

    注:可以通过下面的命令,使用配置使用KahaDB的activemq_kaha.xml,并启动服务。

D:\software\apache-activemq-5.13.2>bin\activemq.bat start 
xbean:d:/software/apache-activemq-5.13.2/conf/activemq_kaha.xml

    为了能更好的使用和配置KahaDB,研究一下Kaha消息存储的内部实现是很重要的。


5.2.1 KahaDB内部实现

    KahaDB消息存储,是所有已提供的消息存储实现中速度最快的。其速度是综合了数据记录文件组成的快速事务日志,高度优化的消息ID索引和内存消息缓存的结果。图5.3展示了一个KahaDB预览。

    图中提供了KahaDB包含的三个独立的部分,如下:

     数据日志(data log) 扮演了消息日志的角色,它包含了滚动的消息日志和存储在特定长度的文件中的命令(如事务边界和消息删除)。当当前使用的数据文件达到最大长度时,一个新的数据文件会随之创建。所有数据文件中的消息上都有引用计数,因此一旦文件中的消息不再有需要了,这个文件就会被归档或删除。在数据文件中,消息以追加的方式添加到当前文件末尾,因此存储非常之快。

     如果存在接收消息的活跃消费者,缓存将消息临时保存。如果这些活跃的消费者,消息被分发的同时会被安排存储起来。如果消息及时被确认了,就无需写入磁盘。

     BTree索引通过对消息ID的索引保留了对在数据文件中消息的引用。索引为queue维护了先进先出的数据结构和持久订阅者指向它们的topic消息。redo日志仅在ActiveMQ服务端没有完整地关闭时使用,通常是要确保维护完整的BTree索引。

    KahaDB用磁盘上的不同文件来记录数据日志和索引,所以下面的章节我们会讲解经典的KahaDB目录发结构。


5.2.2 KahaDB目录结构

    当你启动了配置了KahaDB存储的ActiveMQ服务端后,会自动创建一个目录,其中保存了持久化消息,这个文件结构如图5.4。


    在KahaDB目录中,可看到下面的目录和文件结构:

    数据日志文件(db log files):KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-<Number>.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
    归档目录(archive directory):只有当归档启用时才会存在。归档用于存储那些KahaDB不再需要的数据记录,使日后再想恢复消息成为可能。如果归档没有被启用(默认情况),那么不再使用的数据会被直接从文件系统中删除。

    db.data:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息。
    db.redo:这是redo文件,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。


现在已经了解了基本的KahaDB存储,下一步让我看学习下它的配置。


5.2.3 配置KahaDB

KahaDB可以在activemq.xml中配置。它的配置选项控制不同的微调参数,如表格5.1所示。

表5.1 配置KahaDB存储的可用选项

属性名
默认值
描述
directory
activemq-data
KahaDB使用的目录路径
indexWriteBatchSize
1000
批量写入磁盘的索引页数
indexCacheSize
1000
内存中索引页数的缓存大小
enableIndexWriteAsync
false
如果设置了,会异步写入索引
journalMaxFileLength
false
设置每个消息数据记录文件的最大空间
enableJournalDiskSyncs
true
确保每个非事务记录跟随磁盘sync写入(JMS 耐久性要求)
cleanupInterval
30000
检查需要抛弃或移除的不再使用的消息数据记录的时间间隔,单位毫秒
checkpointInterval
30000
Time (ms) before checkpointing the journal ignore 
MissingJournalfiles
false
如果启用, 将忽略丢失的消息日志文件
checkForCorruptJournalFiles
false如果启用, 启动时将校验消息数据记录是否被损坏
checksumJournalFilesfalse如果启用,将为每个消息数据记录提供校验和(checksum)
archiveDataLogsfalse如果启用, 将会把消息数据记录移动到归档目录而不是删除它们
directoryArchivenull定义了当消息数据记录中包含的所有消息都被消费过后,将数据记录的移动过程路径databaseLockedWaitDelay  10000 定义了试图获取数据锁的等待时间,单位毫秒(用于主从共享数据时)
maxAsyncJobs10000排队等待存储的异步消息最大数目(最好与并发消息生产者相同)
concurrentStoreAndDispatchTransactionstrue允许消息分发与事务存储并发地进行
concurrentStoreAndDispatchTopicstrue允许topic消息分发与事务存储并发地进行
concurrentStoreAndDispatchQueuestrue允许queue消息分发与事务存储并发地进行

    AcitveMQ为消息存储提供了插件式的API,除KahaDB外还有有三种实现与ActiveMQ一起发布:
    AMQ消息存储:为高性能设计的基于文件的消息存储
    JDBC:基于JDBC的消息存储
    内存消息存储:基于内存的消息存储

    下面的三节中我们将学习额外三种消息存储的使用和配置。首先是AMQ存储,像KahaDB一样的文件存储实现。它早于KahaDB出现,但因为它的性能特性,使用AMQ代替KahaDB也是有意义的,提供的持久化目标数量相对较低。


参考:Apache ActiveMQ ™ -- KahaDB