点击上方蓝色“ 网路冷眼” 可以订阅哦!


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

Thingsboard是一个用于数据收集,处理,可视化和设备管理的开源IoT平台。它通过工业标准IoT协议(MQTT,CoAP和HTTP)实现设备连接,并支持云和内部部署。 Thingsboard兼具可扩展性,容错性和性能,因此不会丢失数据。本文介绍了如何以1美元的使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息。供大家参考。

架构

数据流和测试工具

性能改进步骤

步骤1:异步Cassandra驱动程序API

步骤2:连接池

步骤3:垂直缩放

步骤4:水平缩放

如何重复测试

结论

Thingsboard开源IoT平台的一个关键特性是数据收集,这是必须在高负载下可靠运行的关键特性。在本文中,我们将描述所做的步骤和改进,以确保Thingsboard服务器的单一实例能够每秒不断处理20,000个以上的设备和30,000+条MQTT发布消息,总而言之,每分钟能处理大约200万条发布的消息。

架构

Thingsboard性能利用三个主要项目:

Netty用于 IoT 设备的高性能MQTT服务器/代理。

Akka为高性能actor系统协调数百万设备之间的消息。

Cassandra用于可扩展的高性能NoSQL DB,用于存储来自设备的时间序列数据。

我们还使用Zookeeper进行协调和以集群模式使用gRPC。有关更多详细信息,请参阅平台架构。

数据流和测试工具

IoT设备通过MQTT连接到Thingsboard服务器,并使用JSON格式发出“publish”命令。单个发布消息的大小约为100字节。 MQTT是轻量级的发布/订阅消息传递协议,并且相对于HTTP请求/响应协议提供了许多优点。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

Thingsboard服务器处理MQTT发布消息并将它们以异步方式存储到Cassandra。服务器还可以从Web UI仪表板(如果存在的话)将数据推送到WebSocket订阅。我们尝试避免任何阻塞操作,这对于整体系统性能至关重要。 Thingsboard支持MQTT QoS级别1,这意味着客户端只有在将数据存储到Cassandra DB后才会接收对发布消息的响应。 QoS级别1可能的数据复制只是覆盖对应的Cassandra行,因此不会出现在持久性数据中。此功能提供可靠的数据传递和持久性。

我们使用了基于Akka和Netty的Gatling负载测试框架。 Gatling能够使用2核CPU的5-10%来模拟10K MQTT客户端。请参阅我们的单独文章,了解我们如何改进非官方GatlingMQTT插件以支持测试用例。

性能改进步骤 步骤1.异步Cassandra驱动程序API

在一台带有SSD的4核的现代笔记本电脑第一次性能测试的结果是相当糟糕。平台每秒只能处理200条消息。根本原因和主要性能瓶颈相当明显,很容易找出来。看来,处理不是 100%异步,我们正在执行阻塞 API 调用Cassandra驱动程序在Telemetry插件中的actor。快速重构插件实现使性能提高了10 倍以上,从1000个设备每秒收到大约 2500 条已发布的消息。我们想推荐这篇文章关于异步查询Cassandra。

步骤2.连接池

我们决定迁移到AWS EC2实例,以便能够共享我们执行的结果和测试。我们开始在 c4.xlarge 实例(4个vCPU和7.5 Gb的RAM)上运行测试,Cassandra和Thingsboard服务位于同一位置。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

测试规格:

设备数:10 ,000

每个设备的发布频率:每秒一次

总负载:每秒10,000条消息

第一个测试结果显然是不可接受的:


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

上面的巨大响应时间是由于服务器根本无法每秒处理10 K个消息,因为它们正在排队。

我们已经开始调查测试实例上的内存和CPU负载。最初我们对性能低下的猜测是由于 CPU 或 RAM 上的超负载造成的。但事实上在负载测试期间,看到CPU在特定的时刻空闲了几秒钟。这个“暂停”事件每3-7秒发生一次,见下图:


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

作为下一步,我们决定在这些暂停期间执行线程转储。我们期望看到被阻塞的线程,这可以给我们在暂停期间发生了什么的一些线索。因此,我们打开分开的控制台来监视CPU负载,另一个控制台执行压力测试时使用以下命令执行线程转储:

kill -3 THINGSBOARD_PID

我们已经确定,在暂停期间,总有一个线程处于TIMED_WAITING状态,根本原因是Cassandra驱动程序的方法awaitAvailableConnection:

java.lang.Thread.State:TIMED_WAITING (parking)

at sun.misc.Unsafe.park(NativeMethod)

parking to wait for <0x0000000092d9d390> (ajava.util.concurrent.locks.AbstractQueuedSynchronizer $ConditionObject )

atjava.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

atjava.util.concurrent.locks.AbstractQueuedSynchronizer $ConditionObject .await(AbstractQueuedSynchronizer.java:2163)

atcom.datastax.driver.core.HostConnectionPool.awaitAvailableConnection(HostConnectionPool.java:287)

atcom.datastax.driver.core.HostConnectionPool.waitForConnection(HostConnectionPool.java:328)

atcom.datastax.driver.core.HostConnectionPool.borrowConnection(HostConnectionPool.java:251)

atcom.datastax.driver.core.RequestHandler $SpeculativeExecution .query(RequestHandler.java:301)

atcom.datastax.driver.core.RequestHandler $SpeculativeExecution .sendRequest(RequestHandler.java:281)

atcom.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)

atcom.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:91)

atcom.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)

atorg.thingsboard.server.dao.AbstractDao.executeAsync(AbstractDao.java:91)

atorg.thingsboard.server.dao.AbstractDao.executeAsyncWrite(AbstractDao.java:75)

atorg.thingsboard.server.dao.timeseries.BaseTimeseriesDao.savePartition(BaseTimeseriesDao.java:135)

因此,我们意识到在用例中,cassandra驱动程序的默认连接池配置导致了错误的结果。

连接池功能的正式配置包含特殊选项“每个连接的同时请求数”,允许您调整每个连接的并发请求。我们使用cassandra驱动程序协议v3,默认情况下使用下一个值:

1024为LOCAL主机

256用于REMOTE主机

考虑到我们实际上从10,000个设备中提取数据,默认值是绝对不够的。因此,我们对LOCAL和REMOTE主机的代码和更新值进行了更改,并将其设置为最大可能值:

poolingOptions

. setMaxRequestsPerConnection (HostDistance. LOCAL , 32768 )

. setMaxRequestsPerConnection (HostDistance. REMOTE , 32768 );


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息
架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

最终结果虽然好多了,但远不能达到每分钟100万条消息的性能。在对c4.xlarge的测试中,我们还没有看到CPU负载的暂停。整个测试期间CPU负载很高(80-95%)。我们做了几个线程转储来验证cassandra驱动程序不等待可用的连接,确实再没有看到这个问题了。

步骤3:垂直缩放

我们决定在两个更强大的节点 c4.2xlarge 上运行相同的测试,节点配置吗8个vCPU和15Gb的RAM。性能提高不是线性的,CPU仍然负载(80-90%)。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

我们注意到响应时间有显着改善。在测试开始的最大峰值之后,最大响应时间在200ms内,平均响应时间为50ms。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

每秒的请求数大约为10K。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

我们还对c4.4xlarge执行测试,它配置有16个vCPU和30Gb RAM,但没有发现到重大改进,决定分离Thingsboard服务器,并将Cassandra移动到三个节点集群。

步骤4:水平缩放

我们的主要目标是确定使用在c4.2xlarge上运行的单一Thingsboard服务器到底可以处理多少条MQTT消息。我们将在另一篇文章中介绍Thingsboard集群的水平可伸缩性。因此,我们决定将Cassandra移动到三个具有默认配置的c4.xlarge单独实例上,并同时从两个单独的c4.xlarge实例启动gatling应力测试工具,以尽量减少第三方对延迟和吞吐量的可能影响。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

测试规格:

设备数:20,000

每台设备的发布频率:每秒两次

总负载:每秒40,000条消息

在不同客户端机器上启动的两个同时测试运行的统计信息如下所示。


架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息
架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

本文数据库(综合)相关术语:系统安全软件

分页:12
转载请注明
本文标题:架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息
本站链接:http://www.codesec.net/view/533461.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(59)