未加星标

Cassandra源码分析-Network

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二03 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏

Cassandra-2.2 源码分析:Netty客户端/服务端、请求处理、消息服务

CassandraDaemon

启动日志,代表了各个组件的启动顺序

INFO [main] 2016-10-11 15:39:47,410 ColumnFamilyStore.java:382 - Initializing system.sstable_activity
INFO [main] 2016-10-11 15:39:48,950 CacheService.java:111 - Initializing key cache with capacity of 49 MBs.
INFO [main] 2016-10-11 15:39:48,967 CacheService.java:133 - Initializing row cache with capacity of 0 MBs
INFO [main] 2016-10-11 15:39:48,972 CacheService.java:162 - Initializing counter cache with capacity of 24 MBs
INFO [main] 2016-10-11 15:39:48,974 CacheService.java:173 - Scheduling counter cache save to every 7200 seconds (going to save all keys).
INFO [main] 2016-10-11 15:39:49,080 ColumnFamilyStore.java:382 - Initializing system.hints
INFO [main] 2016-10-11 15:39:49,089 ColumnFamilyStore.java:382 - Initializing system.......
INFO [main] 2016-10-11 15:39:51,302 ColumnFamilyStore.java:382 - Initializing demo.test
INFO [main] 2016-10-11 15:39:51,716 Index.java:93 - Initializing Lucene index
INFO [main] 2016-10-11 15:39:52,405 Index.java:101 - Initialized index demo.test.idx
INFO [main] 2016-10-11 15:39:52,413 ColumnFamilyStore.java:382 - Initializing demo.tweets
INFO [main] 2016-10-11 15:39:52,419 AutoSavingCache.java:163 - Completed loading (1 ms; 21 keys) KeyCache cache
INFO [main] 2016-10-11 15:39:52,497 CommitLog.java:168 - Replaying bin/../data/commitlog/CommitLog-5-1474959171115.log, ....
INFO [main] 2016-10-11 15:39:52,739 CommitLog.java:170 - Log replay complete, 135 replayed mutations
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:600 - Cassandra version: 2.2.6
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:601 - Thrift API version: 20.1.0
INFO [main] 2016-10-11 15:39:52,969 StorageService.java:602 - CQL supported versions: 3.3.1 (default: 3.3.1)
INFO [main] 2016-10-11 15:39:53,010 IndexSummaryManager.java:85 - Initializing index summary manager with a memory pool size of 49 MB and a resize interval of 60 minutes
INFO [main] 2016-10-11 15:39:53,013 StorageService.java:621 - Loading persisted ring state
INFO [main] 2016-10-11 15:39:53,056 StorageService.java:794 - Starting up server gossip
INFO [main] 2016-10-11 15:39:53,247 MessagingService.java:540 - Starting Messaging Service on localhost/127.0.0.1:7000 (lo0)
INFO [main] 2016-10-11 15:39:53,318 StorageService.java:968 - Using saved tokens [-1036061867878377743, -1049032071638556980, ]
INFO [main] 2016-10-11 15:39:53,425 StorageService.java:1937 - Node localhost/127.0.0.1 state jump to NORMAL
INFO [main] 2016-10-11 15:39:53,785 Server.java:151 - Netty using Java NIO event loop
INFO [main] 2016-10-11 15:39:53,970 Server.java:185 - Starting listening for CQL clients on localhost/127.0.0.1:9042...
INFO [main] 2016-10-11 15:39:54,159 CassandraDaemon.java:439 - Not starting RPC server as requested. Use JMX (StorageService->startRPCServer()) or nodetool (enablethrift) to start it

停止Cassandra,会停止CassandraDaemon、Server(nativeServer)、Gossiper。因为默认没有启动ThriftServer,所以就不需要停止它了。

INFO [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,275 CassandraDaemon.java:451 - Cassandra shutting down...
INFO [RMI TCP Connection(5)-127.0.0.1] 2016-10-11 15:49:05,286 Server.java:218 - Stop listening for CQL clients
INFO [StorageServiceShutdownHook] 2016-10-11 15:49:05,292 Gossiper.java:1448 - Announcing shutdown

CassandraDaemon启动类,有三个主要的服务类:

StorageService:存储相关的服务 ThriftServer:Thrift协议 Server:native网络传输通信服务器 private static final CassandraDaemon instance = new CassandraDaemon();
public static void main(String[] args) {
instance.activate();
}
public void activate(){
setup();
start();
}
protected void setup(){
StorageService.instance.initServer();
int rpcPort = DatabaseDescriptor.getRpcPort();
int nativePort = DatabaseDescriptor.getNativeTransportPort();
thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
nativeServer = new org.apache.cassandra.transport.Server(nativeAddr, nativePort);
}
public void start() {
nativeServer.start();
thriftServer.start();
}

配置文件中端口和对应的实现类:

storage_port: 7000 --> StorageService
native_transport_port: 9042 --> nativeServer
rpc_port: 9160 --> ThriftServer
start_native_transport: true --> 默认开启native协议
start_rpc: false --> 默认关闭thrift协议

CassandraDaemon的内部类Server有两个实现类,用于Thrift协议的o.a.c.thrift.ThriftServer,以及用于native二进制协议的o.a.c.transport.Server。

ThriftServer

cassandra.thrift文件在安装包的interface下,主要分为

data structures(Column、SuperColumn等) service的struct数据结构:ConsistencyLevel、ColumnParent、ColumnPath、SliceRange、KeyRange、KeySlice、Deletion、Mutation、TokenRange、ColumnDef、CfDef、KsDef、ColumnSlice等 service的api服务方法:get、get_slice、multiget_slice、get_range_slices、insert、add、remove、batch_mutate、get_multi_slice等 public class ThriftServer implements CassandraDaemon.Server {
public void start() {
CassandraServer iface = getCassandraServer();
server = new ThriftServerThread(address, port, backlog, getProcessor(iface), getTransportFactory());
server.start();
}
private static class ThriftServerThread extends Thread {
private final TServer serverEngine;
public ThriftServerThread(...) {serverEngine = new TServerCustomFactory(DatabaseDescriptor.getRpcServerType()).buildTServer(args);
}
public void run() {serverEngine.serve();
}
}
}
public class CustomTThreadPoolServer extends TServer {
public void serve() {
serverTransport_.listen();
stopped = false;
while (!stopped) {TTransport client = serverTransport_.accept();processorFactory_.getProcessor(client_).process(input,output)
}
executorService.shutdown();
}
}

以get查询为例:cassandra.thrift的服务定义了get方法需要主键key、列路径ColumnPath、一致性级别

struct ColumnPath {
3: required string column_family,
4: optional binary super_column,
5: optional binary column,
}
service cassandra {
ColumnOrSuperColumn get(1:required binary key, 2:required ColumnPath column_path, 3:required ConsistencyLevel consistency_level=ConsistencyLevel.ONE)
}

服务端的处理方法在interface/thrift/gen-java/o.a.c.thrift.Cassandra类的TProcessor中

public static class get<I extends Iface> extends org.apache.thrift.ProcessFunction<I, get_args> {
public get_result getResult(I iface, get_args args) throws org.apache.thrift.TException {
get_result result = new get_result();
result.success = iface.get(args.key, args.column_path, args.consistency_level);
return result;
}
}

最终会调用o.a.c.thrift.CassandraServer的get方法:

public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level) {
ThriftClientState cState = state();
String keyspace = cState.getKeyspace();
cState.hasColumnFamilyAccess(keyspace, column_path.column_family, Permission.SELECT);
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_path.column_family);
org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
SortedSet<CellName> names = new TreeSet<CellName>(metadata.comparator);
names.add(metadata.comparator.cellFromByteBuffer(column_path.column));
IDiskAtomFilter filter = new NamesQueryFilter(names);
ReadCommand command = ReadCommand.create(keyspace, key, column_path.column_family, now, filter);
Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(Arrays.asList(command), consistencyLevel, cState);
ColumnFamily cf = cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
List<ColumnOrSuperColumn> tcolumns = thriftifyColumnFamily(cf, metadata.isSuper() && column_path.column != null, false, now);
return tcolumns.get(0);
}

根据客户端构造好的ReadCommand查询发生在readColumnFamily,并通过StorageProxy代理类完成读操作

protected Map<DecoratedKey, ColumnFamily> readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency_level, ClientState cState) {
Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new HashMap<DecoratedKey, ColumnFamily>();
schedule(DatabaseDescriptor.getReadRpcTimeout());
List<Row> rows = StorageProxy.read(commands, consistency_level, cState);
for (Row row: rows) {
columnFamilyKeyMap.put(row.key, row.cf);
}
return columnFamilyKeyMap;
}
Java Driver(Netty)

DataStax的 Java客户端 使用Netty实现,Cassandra的native服务端协议也采用Netty实现。

所以先了解客户端怎么发送数据,才能知道服务端怎么接收数据。使用Driver, 读取Cassandra版本的简单示例 如下:

Cluster cluster = Cluster.builder()
.addContactPoints(CONTACT_POINTS).withPort(PORT)
.build();
Session session = cluster.connect();
ResultSet rs = session.execute("select release_version from system.local");
Row row = rs.one();
String releaseVersion = row.getString("release_version");

Session是客户端建立的和服务端的会话连接对象,当connect连接建立成功后,实际上客户端和服务端的网络通道已经都打通了。Connection是客户端和服务端节点实际的连接处理对象。


Cassandra源码分析-Network

DataStax的Driver是Netty的客户端,Cassadra的nativeServer是Netty的服务端。所以Driver采用Bootstrap连接服务端,服务端采用ServerBootstrap接受客户端的连接。

class Connection {
ListenableFuture<Void> initAsync() {
Bootstrap bootstrap = factory.newBootstrap();
ProtocolOptions protocolOptions = factory.configuration.getProtocolOptions();
bootstrap.handler( new Initializer(this, protocolVersion, protocolOptions.getCompression().compressor(), protocolOptions.getSSLOptions(), factory.configuration.getPoolingOptions().getHeartbeatIntervalSeconds(), factory.configuration.getNettyOptions(), factory.configuration.getCodecRegistry()));
ChannelFuture future = bootstrap.connect(address);
}
}

客户端实际的Handler主要是Initializer,而其中处理请求的是Connection.Dispatcher

private static class Initializer extends ChannelInitializer<SocketChannel> {
// Stateless handlers
private static final Message.ProtocolDecoder messageDecoder = new Message.ProtocolDecoder();
private static final Message.ProtocolEncoder messageEncoderV4 = new Message.ProtocolEncoder(ProtocolVersion.V4);
private static final Frame.Encoder frameEncoder = new Frame.Encoder();
private final Connection connection;
private final FrameCompressor compressor;
private final NettyOptions nettyOptions;
private final ChannelHandler idleStateHandler;
private final CodecRegistry codecRegistry;
protected void initChannel(SocketChannel channel) throws Exception {
// set the codec registry so that it can be accessed by ProtocolDecoder
channel.attr(Message.CODEC_REGISTRY_ATTRIBUTE_KEY).set(codecRegistry);
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new Frame.Decoder());
pipeline.addLast("frameEncoder", frameEncoder);
if (compressor != null) {pipeline.addLast("frameDecompressor", new Frame.Decompressor(compressor));pipeline.addLast("frameCompressor", new Frame.Compressor(compressor));
}
pipeline.addLast("messageDecoder", messageDecoder);
pipeline.addLast("messageEncoder", messageEncoderFor(protocolVersion));
pipeline.addLast("idleStateHandler", idleStateHandler);
pipeline.addLast("dispatcher", connection.dispatcher);
nettyOptions.afterChannelInitialized(channel);
}
}

Dispatcher看起来只是负责读取消息的响应结果

class Dispatcher extends SimpleChannelInboundHandler<Message.Response> {
protected void channelRead0(ChannelHandlerContext ctx, Message.Response response) throws Exception {
int streamId = response.getStreamId();
ResponseHandler handler = pending.remove(streamId);
handler.cancelTimeout();
handler.callback.onSet(Connection.this, response, System.nanoTime() - handler.startTime, handler.retryCount);
if (isClosed()) tryTerminate(false);
}
}

那么客户端在哪里发送数据呢?我们从示例的session.execute看看能不能找到发送消息的线索。

public ResultSetFuture executeAsync(final Statement statement) {
DefaultResultSetFuture future = new DefaultResultSetFuture(this, cluster.manager.protocolVersion(), makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();
return future;
}

makeRequestMessage会创建请求,那么sendRequest就会真正地发送请求了。

class RequestHandler {
void sendRequest() {
startNewExecution();
}
private void startNewExecution() {
//future就是callback,因为future中会makeRequestMessage,所以这里可以获取callback的Request
Message.Request request = callback.request();
SpeculativeExecution execution = new SpeculativeExecution(request, position);
runningExecutions.add(execution);
execution.sendRequest(); //发送请求,request封装在execution中
}
}

SpeculativeExecution是推测执行,其中QueryPlan是查询计划(根据客户端设置的负载均衡策略,路由客户端请求到不同的host节点,这个host就是传说中的Coordinator)。

class SpeculativeExecution implements Connection.ResponseCallback {
private final Message.Request request;
void sendRequest() {
Host host;
while (!isDone.get() && (host = queryPlan.next()) != null && !queryStateRef.get().isCancelled()) {if (query(host)) return;
}
reportNoMoreHosts(this);
}
private boolean query(final Host host) {
HostConnectionPool currentPool = manager.pools.get(host);
if (allowSpeculativeExecutions && nextExecutionScheduled.compareAndSet(false, true))scheduleExecution(speculativeExecutionPlan.nextExecution(host));
Connection connection = currentPool.borrowConnection(manager.configuration().getPoolingOptions().getPoolTimeoutMillis(), TimeUnit.MILLISECONDS);
write(connection, this);
return true;
}
private void write(Connection connection, Connection.ResponseCallback responseCallback) throws ConnectionException, BusyConnectionException {
connectionHandler = connection.write(responseCallback, statement.getReadTimeoutMillis(), false);
}
}

query方法看起来把request对象丢了,不过write(connection, this)传递了this对象,仍然有机会取出request对象。

write方法继续传递responseCallback对象,可以看到callback.request()起死回生了,我们的请求对象request并没有丢失。

channel.writeAndFlush(request)是Netty写数据的方法,即客户端把请求对象发送给了服务端。

ResponseHandler write(ResponseCallback callback, long statementReadTimeoutMillis, boolean startTimeout) throws ConnectionException, BusyConnectionException {
ResponseHandler handler = new ResponseHandler(this, statementReadTimeoutMillis, callback);
dispatcher.add(handler);
Message.Request request = callback.request().setStreamId(handler.streamId);
if (DISABLE_COALESCING) { //直接写,不缓存
channel.writeAndFlush(request).addListener(writeHandler(request, handler));
} else { //缓存
flush(new FlushItem(channel, request, writeHandler(request, handler)));
}
return handler;
}
nativeServer(Netty)

native服务器使用Netty,ServerBootstrap绑定的Initializer添加了多种Handler组成ChannelPipeline:

Frame解码、编码 消息解码、编码 消息分发(Dispatcher) public class Server implements CassandraDaemon.Server {
private EventLoopGroup workerGroup;
private EventExecutor eventExecutorGroup;
private void run() {
eventExecutorGroup = new RequestThreadPoolExecutor();
boolean hasEpoll = enableEpoll ? Epoll.isAvailable() : false;
if (hasEpoll) {workerGroup = new EpollEventLoopGroup();
} else {workerGroup = new NioEventLoopGroup();
}
ServerBootstrap bootstrap = new ServerBootstrap() .group(workerGroup) .channel(hasEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_LINGER, 0) .childOption(ChannelOption.SO_KEEPALIVE, DatabaseDescriptor.getRpcKeepAlive()) .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator) .childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024) .childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
bootstrap.childHandler(new Initializer(this));
bootstrap.bind(socket);
}
private static class Initializer extends ChannelInitializer {
private final Server server;
protected void initChannel(Channel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addFirst("connectionLimitHandler", new ConnectionLimitHandler()); //连接限制pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); //Frame解码pipeline.addLast("frameEncoder", new Frame.Encoder()); //Frame编码pipeline.addLast("frameDecompressor", new Frame.Decompressor()); //Frame解压缩pipeline.addLast("frameCompressor", new Frame.Compressor()); //Frame压缩pipeline.addLast("messageDecoder", new Message.ProtocolDecoder()); //消息内容解码pipeline.addLast("messageEncoder", new Message.ProtocolEncoder()); //消息内容编码pipeline.addLast(server.eventExecutorGroup, "executor", new Message.Dispatcher()); //消息分发
}
}
}

编解码和请求、响应是对应的,比如服务端收到请求,将客户端发送的请求进行解码(ProtocolDecoder),服务端处理完毕后,将响应内容编码发送到客户端(ProtocolEncoder)。

CQL协议和Thrift协议一样,都需要事先定义好数据结构、服务方法等,CQL协议的说明文档在doc文件夹下,Frame的中文翻译是框架,所以它定义了消息内容的格式,其中Header消息头一共9个字节(40+32=72bits/8=9byte),消息内容是不定长的。Message是建立在Frame之上的消息类型(所以你可以看到Initializer构建ChannelPipeline是先Frame,然后是Message,最后是Message的Dispatcher,这跟请求的处理也是类型的:服务端先接收请求,然后解析出对应的请求类型,最后才处理请求)。

消息类型有多种:ERROR、STARTUP、QUERY、RESULT、PREPARE、EXECUTE、EVENT、BATCH,每种消息类型都指定了是Request还是Response。比如ERROR、RESULT、EVENT是Response,其他都是Request。

Dispatcher

服务端的Dispatcher处理器会接收请求、执行请求、返回响应结果。这里的flush和Netty客户端中发送请求时采用缓存形式的flush类似,

不过最终的目的都是发送数据给对端(客户端发送请求给服务端,服务端发送响应结果给客户端)。

public static class Dispatcher extends SimpleChannelInboundHandler<Request> {
public void channelRead0(ChannelHandlerContext ctx, Request request) {
ServerConnection connection = (ServerConnection)request.connection();
logger.trace("Received: {}, v={}", request, connection.getVersion());
Response response = request.execute(qstate); //服务端执行请求
response.setStreamId(request.getStreamId());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);
logger.trace("Responding: {}, v={}", response, connection.getVersion());
flush(new FlushItem(ctx, response, request.getSourceFrame()));
}
}

以CQL查询为例,trace日志如下:

TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app';, v=4
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,346 QueryProcessor.java:221 - Process [email protected] @CL.ONE
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,346 SliceQueryPager.java:92 - Querying next page of slice query; new filter: SliceQueryFilter [reversed=false, slices=[[, ]], count=100, toGroup = 1]
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 ReadCallback.java:76 - Blockfor is 1; setting up requests to localhost/127.0.0.1
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,347 AbstractReadExecutor.java:118 - reading data locally
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 0 of 100: 1111111111-1::false:[email protected]
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:event:false:[email protected]
TRACE [SharedPool-Worker-2] 2016-10-11 17:02:27,348 SliceQueryFilter.java:269 - collecting 1 of 100: 1111111111-1:timestamp:false:[email protected]
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,348 StorageProxy.java:1444 - Read: 1 ms.
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:95 - Fetched 1 live rows
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:112 - Got result (1) smaller than page size (100), considering pager exhausted
DEBUG [SharedPool-Worker-1] 2016-10-11 17:02:27,349 AbstractQueryPager.java:133 - Remaining rows to page: 2147483646
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS [attribute(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][partner_code(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][app_name(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][type(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][sequence_id(forseti, velocity_app), org.apache.cassandra.db.marshal.ReversedType(org.apache.cassandra.db.marshal.UTF8Type)][event(forseti, velocity_app), org.apache.cassandra.db.marshal.UTF8Type][timestamp(forseti, velocity_app), org.apache.cassandra.db.marshal.LongType]
| zqhxuyuan | tongdun | tongdun_app | login | 1111111111-1 | {jsondata} | 1111111111
---, v=4

如果开启tracing on,会显示查询语句在服务端的运行轨迹。

Tracing session: 6eceb810-8f91-11e6-a2b4-dbe2eb0e3cb9
activity | timestamp | source | source_elapsed
--------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+-----------+---------------- Execute CQL3 query | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 0
Parsing select * from velocity_app where attribute='zqhxuyuan' and type='login' and partner_code='tongdun' and app_name='tongdun_app'; [SharedPool-Worker-1] | 2016-10-11 17:02:27.345000 | 127.0.0.1 | 333 Preparing statement [SharedPool-Worker-1] | 2016-10-11 17:02:27.346000 | 127.0.0.1 | 730 Executing single-partition query on velocity_app [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2236 Acquiring sstable references [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2357 Merging memtable tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2446 Skipped 0/0 non-slice-intersecting sstables, included 0 due to tombstones [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2591 Merging data from memtables and 0 sstables [SharedPool-Worker-2] | 2016-10-11 17:02:27.347000 | 127.0.0.1 | 2661 Read 1 live and 0 tombstone cells [SharedPool-Worker-2] | 2016-10-11 17:02:27.348000 | 127.0.0.1 | 3240 Request complete | 2016-10-11 17:02:27.349147 | 127.0.0.1 | 4147

可以看到 日志文件 第一行/最后一行打印的时间撮和 tracing on 的第一行/最后一行基本一致。

//日志文件
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,345 Message.java:506 - Received: QUERY
TRACE [SharedPool-Worker-1] 2016-10-11 17:02:27,349 Message.java:525 - Responding: ROWS
//CQL tracing on
Execute CQL3 query | 2016-10-11 17:02:27.345000
Request complete | 2016-10-11 17:02:27.349147
QueryMessage

以o.a.c.transport.messages.QueryMessage请求为例,

public Message.Response execute(QueryState state) {
Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
return response;
}

CQLQueryHandler的处理器是QueryProcessor,

public ResultMessage process(String queryString, QueryState queryState, QueryOptions options) {
ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
options.prepare(p.boundNames);
CQLStatement prepared = p.statement;
return processStatement(prepared, queryState, options);
}
public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options) {
logger.trace("Process {} @CL.{}", statement, options.getConsistency());
ClientState clientState = queryState.getClientState();
ResultMessage result = statement.execute(queryState, options);
return result == null ? new ResultMessage.Void() : result;
}

Response response = request.execute(qstate):我们举例了request是QueryMessage(即Message.Request类型),返回结果是ResultMessage,正好是Message.Response类型。

这和我们说的消息类型中,QUERY是Request,RESULT是Response就对应上来了。

现在从Message进入到Statement,以SelectStatement为例,我们终于看到了和thrift类似的StorageProxy代理调用

通常消息类型也会对应不同的Statement,比如QueryMessage对应了SelectStatement,Execute或Batch消息对应不同的Statement。

请求对象的转换:Request - Statement - Command。比如查询请求 - SelectStatement - ReadCommands。

public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException {
ConsistencyLevel cl = options.getConsistency();
int limit = getLimit(options);
Pageable command = getPageableCommand(options, limit, now);
int pageSize = getPageSize(options);
if (pageSize <= 0 || command == null || !QueryPagers.mayNeedPaging(command, pageSize))
return execute(command, options, limit, now, state); //不分页查询
QueryPager pager = QueryPagers.pager(command, cl, state.getClientState(), options.getPagingState());
return execute(pager, options, limit, now, pageSize); //分页查询
}
private ResultMessage.Rows execute(Pageable command, QueryOptions options, int limit, long now, QueryState state) {
List<Row> rows = command instanceof Pageable.ReadCommands ? StorageProxy.read(((Pageable.ReadCommands)command).commands, options.getConsistency(), state.getClientState()) : StorageProxy.getRangeSlice((RangeSliceCommand)command, options.getConsistency());
return processResults(rows, options, limit, now);
}

不管是thrift协议的ThriftServer,还是二进制协议的Server,最终都会调用StorageProxy代理类。


Cassandra源码分析-Network
StorageProxy

StorageProxy代理类的read方法根据一致性级别是不是Serial有两种:普通的读取和事务性的读取(Transaction)。

Cassandra的事务支持使用Paxos实现,对应的读方法是:readWithPaxos

// Performs the actual reading of a row out of the StorageService, fetching a specific set of column names from a given column family.
public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state) {
return consistencyLevel.isSerialConsistency()
? readWithPaxos(commands, consistencyLevel, state)
: readRegular(commands, consistencyLevel);
}
private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel) {
List<Row> rows = fetchRows(commands, consistencyLevel);
return rows;
}
private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel){
// send out read requests
for (int i = 0; i < commands.size(); i++) {
ReadCommand command = commands.get(i);
AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
exec.executeAsync();
}
}

读取的线程池有多种实现,比如不带推测的NeverSpeculatingReadExecutor。实际的读取线程还是被包装在ReadCommand中。

private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor {
public void executeAsync() {
makeDataRequests(targetReplicas.subList(0, 1));
if (targetReplicas.size() > 1)makeDigestRequests(targetReplicas.subList(1, targetReplicas.size()));
}
}
private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints) {
for (InetAddress endpoint : endpoints) {
if (isLocalRequest(endpoint)) { //请求的目的地包含本地节点hasLocalEndpoint = true;continue;
}
MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
}
if (hasLocalEndpoint) { //立即在本地执行,由于还有远程数据需要读取,所以需要callback/handler
StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}

在前面启动CassandraDaemon时,我们说每个Cassandra都会启动Thrift和native两种服务器。对应的StorageProxy作为代理类会接收客户端发送的各种请求(比如读和写)。

但是作为分布式系统,客户端发送请求,具体要交给哪些节点处理呢?Cassandra中有一个协调者的角色表示接收客户端的请求所在的节点,但这个节点可能并不是真正存储数据的节点,

它会将客户端的请求转发到其他真正应该需要存储数据的节点。读取和存储一样,如果数据没有存储在协调者节点上,也就无法从协调者读取数据,那么协调者也应该负责发送读取请求到

真正存储数据的节点,然后等待真实节点返回数据给协调者,再由协调者返回数据给客户端。

这里接收请求的节点即协调者,就会负责makeRequests创建请求。如果说客户端的请求正好也会存储到当前协调者上,那么协调者就可以直接存储数据了。

所以如果满足isLocalRequest,就会在本地节点通过maybeExecuteImmediately立即执行命令。对于其他非本地的远程节点,则通过sendRRWithFailure把带有命令的请求发送出去(发送到哪个目标节点,由第二个参数endpoint决定)。

LocalReadRunnable封装了ReadCommand线程类和回调函数,实际的读取在command.getRow,最后返回Row一行记录。ReadCommand有两种实现:SliceFromReadCommand和SliceByNamesReadCommand。

static class LocalReadRunnable extends DroppableRunnable {
private final ReadCommand command;
private final ReadCallback<ReadResponse, Row> handler;
protected void runMayThrow() {
Keyspace keyspace = Keyspace.open(command.ksName);
Row r = command.getRow(keyspace);
ReadResponse result = ReadVerbHandler.getResponse(command, r);
handler.response(result);
}
}

我们来看下客户端调用StorageProxy的命令(比如ReadCommand)是如何在服务端传输的

AbstractReadExecutor(比如NeverSpeculatingReadExecutor),然后调用executeAsync执行线程池 LocalReadRunnable,调用maybeExecuteImmediately执行线程 在LocalReadRunnable里,runMayThrow会开始真正执行ReadCommand的getRow指令
Cassandra源码分析-Network
StorageService

CassandraDaemon在启动thrift服务器和native服务器之前,先初始化了StorageService。刚启动的Cassandra会尝试加入集群,其中和网络相关的是MessagingService消息服务。

StorageService类似前面的ThriftServer和native netty Server,都是一种服务端的实现。只不过StorageService负责存储,而前两者负责消息传输、RPC调用。

问题:StorageProxy可以看做是StorageService的前置代理类,客户端请求要先经过StorageProxy才能到达StorageService。还是说StorageProxy和StorageService是平等的关系?

实际上两者应该是平等的,我们并没有看到StorageProxy到StorageService的调用。再者,StorageService本身也是可以接收客户端请求的。

public synchronized void initServer(int delay) {
prepareToJoin();
}
private void prepareToJoin() {
Gossiper.instance.register(this);
Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates);
MessagingService.instance().listen();
LoadBroadcaster.instance.startBroadcasting();
HintedHandOffManager.instance.start();
BatchlogManager.instance.start();
}

StorageService除了消息的存储服务类MessagingService外,还有其他和消息存储相关的第三方类,这些类共同组成了Cassandra分布式的存储特性,包括:

Gossiper协议,用来保证集群、节点的一致性 HintedHandOffManager,在节点出现异常时,管理暂时失败的请求 BatchlogManager,提交日志管理类 MessagingService

消息服务采用原始的ServerSocket,启动服务端线程后,在SocketThread中开始接受客户端请求,客户端请求的类型包括stream和普通的消息。

Streaming消息也包括多种类型,主要发生于节点之间数据的流式交换,比如sstableloader,nodetool repair都会产生streaming线程。

http://www.datastax.com/dev/blog/streaming-in-cassandra-2-0 private void listen(InetAddress localEp) throws ConfigurationException {
for (ServerSocket ss : getServerSockets(localEp)) { //监听storage port端口
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
th.start();
socketThreads.add(th);
}
}
public static class SocketThread extends Thread {
private final ServerSocket server;
public void run() {
Socket socket = server.accept();
DataInputStream in = new DataInputStream(socket.getInputStream());
int header = in.readInt();
boolean isStream = MessagingService.getBits(header, 3, 1) == 1;
int version = MessagingService.getBits(header, 15, 8);
Thread thread = isStream ? new IncomingStreamingConnection(version, socket, connections) : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket, connections);
thread.start();
}
}

IncomingTcpConnection是一个后台线程类,会不停地读取并处理消息,然后交给MessagingService的实例处理。

一个StorageService对应一个ServerSocket,每个ServerSocket只有一个SocketThread,SocketThread的run方法执行后就结束了(调用一次),

那么保证线程不断运行(不中断)就交给了IncomingTcpConnection类去完成了。

public void run(){
receiveMessages();
}
private void receiveMessages() {
DataInput in = new DataInputStream(socket.getInputStream());
while (true){
receiveMessage(in, version);
}
}
private InetAddress receiveMessage(DataInput input, int version) throws IOException {
MessageIn message = MessageIn.read(input, version, id);
MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
return message.from;
}

MessageIn根据输入流构造,其中最关键的是verb,用来决定是哪种类型的消息。

public class MessageIn<T> {
public final InetAddress from;
public final T payload;
public final Map<String, byte[]> parameters;
public final MessagingService.Verb verb;
public final int version;
}

MessagingService.receive(MessageIn)接收到消息后会创建一个MessageDeliveryTask,每个Task会在不同Stage的ThreadPool中运行

public void receive(MessageIn message, int id, long timestamp, boolean isCrossNodeTimestamp) {
Runnable runnable = new MessageDeliveryTask(message, id, timestamp, isCrossNodeTimestamp);
LocalAwareExecutorService stage = StageManager.getStage(message.getMessageType());
stage.execute(runnable, ExecutorLocals.create(state));
}

MessageDeliveryTask也是一个线程,不过它是被线程池调度的,执行完了就完了,不像IncomingTcpConnection那样永远不会结束。

public class MessageDeliveryTask implements Runnable {
private final MessageIn message;
public void run() {
MessagingService.Verb verb = message.verb;
IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
verbHandler.doVerb(message, id);
}
}

先来看下线程的调度,是通过LocalAwareExecutorService,类似线程池。注意execute方法并没有真正执行任务,而是把Runnable的任务包装成FutureTask,并等待后续的某个时间才开始调度。

public abstract class AbstractLocalAwareExecutorService implements LocalAwareExecutorService
public void execute(Runnable command, ExecutorLocals locals) {
addTask(newTaskFor(command, null, locals));
}
protected <T> FutureTask<T> newTaskFor(Runnable runnable, T result, ExecutorLocals locals) {
if (locals != null) {return new LocalSessionFutureTask<T>(runnable, result, locals);
}
return new FutureTask<>(runnable, result);
}
}

前面我们说过协调者会将客户端请求转发到非本地节点,实际上使用的是OutboundTcpConnection,那么对于服务端接收客户端消息,则用的是StorageService的IncomingTcpConnection


Cassandra源码分析-Network

比较StorageProxy和StorageService的MessageService的一些共同点:

服务类 线程池 指令 线程 真正执行方法 StorageProxy AbstractReadExecutor ReadCommand LocalReadRunnable ReadCommand.getRow MessageService LocalAwareExecutorService MessageIn MessageDeliveryTask/ExecutorLocals IVerbHandler.doVerb(messageIn) IVerbHandler接口

IVerbHandler和消息类型一样有多种实现类。

思考下前面使用StorageProxy时,ReadCommand直接执行getRow方法,而用IVerbHandler,则对应使用ReadVerbHandler.doVerb(messageIn),其中messageIn就是ReadCommand。

所以实际上ReadVerbHandler是ReadCommnad的一层封装而已,在ReadVerbHandler.doVerb中最终还是会调用到ReadCommand.getRow方法。

那么为什么要有ReadCommand和ReadVerbHandler两种实现呢,实际上ReadCommand仅仅是Read操作的处理方式,而ReadVerbHandler不仅

本文数据库(综合)相关术语:系统安全软件

分页:12
转载请注明
本文标题:Cassandra源码分析-Network
本站链接:http://www.codesec.net/view/482944.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(48)