redission收发命令流程分析
一、示例,我們從最簡單的GET命令開始。
RBucket<Object> t = redissonClient.getBucket("syncTradeUid_idOff"); int idOff = (int)t.get();二、springboot的Redission自動配置
@Order(value = 4001) @ConditionalOnProperty("redisson.password") @Configuration @EnableConfigurationProperties({RedissonProperties.class}) public class RedissonAutoConfiguration {public RedissonAutoConfiguration() {System.out.println("==========================redis 初始化成功=======================");}@Autowiredprivate RedissonProperties redissonProperties;@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.address")RedissonClient redissonSingle() {Config config = new Config();config.setCodec(new FastJsonCodec());SingleServerConfig serverConfig = config.useSingleServer().setAddress(redissonProperties.getAddress()).setTimeout(redissonProperties.getTimeout()).setConnectionPoolSize(redissonProperties.getConnectionPoolSize()).setConnectionMinimumIdleSize(redissonProperties.getConnectionMinimumIdleSize());if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}/*** 哨兵模式自動裝配* @return*/@Bean(name = "redissonClient")@ConditionalOnProperty(name="redisson.masterName")RedissonClient redissonSentinel() {Config config = new Config();config.setCodec(new FastJsonCodec());SentinelServersConfig serverConfig = config.useSentinelServers().addSentinelAddress(redissonProperties.getSentinelAddresses()).setMasterName(redissonProperties.getMasterName()).setTimeout(redissonProperties.getTimeout()).setMasterConnectionPoolSize(redissonProperties.getMasterConnectionPoolSize()).setSlaveConnectionPoolSize(redissonProperties.getSlaveConnectionPoolSize()).setReadMode(ReadMode.SLAVE);if(!StrUtil.isEmpty(redissonProperties.getPassword())) {serverConfig.setPassword(redissonProperties.getPassword());}return Redisson.create(config);}}application.properites
#單機 redisson.address = redis://127.0.0.1:6379 redisson.password =#哨兵 #redisson.masterName=BF-20190319DBXF #redisson.schema=redis:// #redisson.sentinelAddresses=redis://127.0.0.1:26379,redis://127.0.0.1:26479,redis://127.0.0.1:26579 #redisson.password=三、REDISSION自動配置初始化流程
1.從Redisson.create(config)創建redission對象開始。Redission繼承于
RedissonClient2. 創建連接管理器對象
org.redisson.config.ConfigSupportpublic static ConnectionManager createConnectionManager(Config configCopy) {UUID id = UUID.randomUUID();if (configCopy.getMasterSlaveServersConfig() != null) {validate(configCopy.getMasterSlaveServersConfig());return new MasterSlaveConnectionManager(configCopy.getMasterSlaveServersConfig(), configCopy, id);} else if (configCopy.getSingleServerConfig() != null) {validate(configCopy.getSingleServerConfig());return new SingleConnectionManager(configCopy.getSingleServerConfig(), configCopy, id);} else if (configCopy.getSentinelServersConfig() != null) {validate(configCopy.getSentinelServersConfig());return new SentinelConnectionManager(configCopy.getSentinelServersConfig(), configCopy, id);} else if (configCopy.getClusterServersConfig() != null) {validate(configCopy.getClusterServersConfig());return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);} else if (configCopy.getReplicatedServersConfig() != null) {validate(configCopy.getReplicatedServersConfig());return new ReplicatedConnectionManager(configCopy.getReplicatedServersConfig(), configCopy, id);} else if (configCopy.getConnectionManager() != null) {return configCopy.getConnectionManager();}else {throw new IllegalArgumentException("server(s) address(es) not defined!");}}3.先看下MasterSlaveEntry->setupMasterEntry,這里會創建RedisClient,以及連接REDIS服務器。
org.redisson.connection.MasterSlaveEntry public RFuture<RedisClient> setupMasterEntry(RedisURI address) {RedisClient client = connectionManager.createClient(NodeType.MASTER, address, sslHostname);return setupMasterEntry(client);}4.創建RedisClient,這里面單機也是使用主從管理器,即是只有主沒有從。統一起來。
org.redisson.connection.MasterSlaveConnectionManager@Overridepublic RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);return RedisClient.create(redisConfig);}5.在RedisClient會創建NEETY的bootstrap,channel,handler.
org.redisson.client.RedisClientprivate Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap = new Bootstrap().resolver(config.getResolverGroup()).channel(config.getSocketChannelClass()).group(config.getGroup());bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());config.getNettyHook().afterBoostrapInitialization(bootstrap);return bootstrap;}?6.我們再看下RedisChannelInitializer,有添加哪些inBounder,outBounder
org.redisson.client.handler.RedisChannelInitializer @Overrideprotected void initChannel(Channel ch) throws Exception {initSsl(config, ch);if (type == Type.PLAIN) {ch.pipeline().addLast(new RedisConnectionHandler(redisClient));} else {ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));}ch.pipeline().addLast(connectionWatchdog,CommandEncoder.INSTANCE,CommandBatchEncoder.INSTANCE,new CommandsQueue());if (pingConnectionHandler != null) {ch.pipeline().addLast(pingConnectionHandler);}if (type == Type.PLAIN) {ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));} else {ch.pipeline().addLast(new CommandPubSubDecoder(config));}ch.pipeline().addLast(new ErrorsLoggingHandler());config.getNettyHook().afterChannelInitialization(ch);}?
?7.創建好RedisClient后,開始連接REDIS服務器。這里首先異步解析地址,解析成功后,在添加到寫連接池時會創建和添加連接,在創建連接時會去連接REDIS服務器。
org.redisson.connection. MasterSlaveEntryprivate RFuture<RedisClient> setupMasterEntry(RedisClient client) {RPromise<RedisClient> result = new RedissonPromise<RedisClient>();result.onComplete((res, e) -> {if (e != null) {client.shutdownAsync();}});RFuture<InetSocketAddress> addrFuture = client.resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {result.tryFailure(e);return;}masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),config.getSubscriptionConnectionMinimumIdleSize(),config.getSubscriptionConnectionPoolSize(), connectionManager,NodeType.MASTER);int counter = 1;if (config.getSubscriptionMode() == SubscriptionMode.MASTER) {counter++;}CountableListener<RedisClient> listener = new CountableListener<>(result, client, counter);RFuture<Void> writeFuture = writeConnectionPool.add(masterEntry);writeFuture.onComplete(listener);});return result;}8.查看連接REDIS服務器過程
org.redisson.connection.pool.ConnectionPool. private void initConnections(ClientConnectionsEntry entry, RPromise<Void> initPromise, boolean checkFreezed) {int minimumIdleSize = getMinimumIdleSize(entry);if (minimumIdleSize == 0 || (checkFreezed && entry.isFreezed())) {initPromise.trySuccess(null);return;}AtomicInteger initializedConnections = new AtomicInteger(minimumIdleSize);int startAmount = Math.min(10, minimumIdleSize);AtomicInteger requests = new AtomicInteger(startAmount);for (int i = 0; i < startAmount; i++) {createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);}}在這里可以看到會初始化10個客戶端連接到連接池。
9.從連接池去申請創建連接
ConnectionPool private void createConnection(boolean checkFreezed, AtomicInteger requests, ClientConnectionsEntry entry, RPromise<Void> initPromise,int minimumIdleSize, AtomicInteger initializedConnections) {acquireConnection(entry, new Runnable() {@Overridepublic void run() {RPromise<T> promise = new RedissonPromise<T>();createConnection(entry, promise);promise.onComplete((conn, e) -> {});}});}?10.最終創建連接是在RedisClient.connectAsync這個異步連接方法中。
public RFuture<RedisConnection> connectAsync() {final RPromise<RedisConnection> f = new RedissonPromise<RedisConnection>();RFuture<InetSocketAddress> addrFuture = resolveAddr();addrFuture.onComplete((res, e) -> {if (e != null) {f.tryFailure(e);return;}ChannelFuture channelFuture = bootstrap.connect(res);});return f;}11.在連接成功后,RedisConnectionHandler.channelRegistered方法中創建連接對象。
@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (connection == null) {connection = createConnection(ctx);}super.channelRegistered(ctx);}12.在這里對channel賦值,保存。這里每個channel里面會有一個RedisConnection的屬性。
RedisConnection public void updateChannel(Channel channel) {this.channel = channel;channel.attr(CONNECTION).set(this);}13.在連接成功后發送PING心跳命令
BaseConnectionHandler @Overridepublic void channelActive(final ChannelHandlerContext ctx) {List<RFuture<Object>> futures = new ArrayList<RFuture<Object>>();RedisClientConfig config = redisClient.getConfig();if (config.getPassword() != null) {RFuture<Object> future;if (config.getUsername() != null) {future = connection.async(RedisCommands.AUTH, config.getUsername(), config.getPassword());} else {future = connection.async(RedisCommands.AUTH, config.getPassword());}futures.add(future);}futures.add(future);}if (config.getPingConnectionInterval() > 0) {RFuture<Object> future = connection.async(RedisCommands.PING);futures.add(future);}final AtomicBoolean retry = new AtomicBoolean();final AtomicInteger commandsCounter = new AtomicInteger(futures.size());for (RFuture<Object> future : futures) {future.onComplete((res, e) -> {if (e != null) {if (e instanceof RedisLoadingException) {if (retry.compareAndSet(false, true)) {ctx.executor().schedule(() -> {channelActive(ctx);}, 1, TimeUnit.SECONDS);}return;}connection.closeAsync();connectionPromise.tryFailure(e);return;}if (commandsCounter.decrementAndGet() == 0) {ctx.fireChannelActive();connectionPromise.trySuccess(connection);}});}}四、發送命令過程。
1.首先從RedissionBucket的set方法
,這里面的?commandExecutor來源于connectionManager中的命令執行器。
2.然后進行入到RedisExecutor中的execute方法,去異步執行命令。這里首先從連接池獲取連接,然后在異步連接成功后,發送命令。
public void execute() {codec = getCodec(codec);RFuture<RedisConnection> connectionFuture = getConnection();connectionFuture.onComplete((connection, e) -> {sendCommand(attemptPromise, connection);writeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {checkWriteFuture(writeFuture, attemptPromise, connection);}});releaseConnection(attemptPromise, connectionFuture);});}3.獲取連接是從連接池中獲取。根據讀寫模式從連接管理器中選擇可用連接返回。
RedisExecutor protected RFuture<RedisConnection> getConnection() {if (readOnlyMode) {connectionFuture = connectionManager.connectionReadOp(source, command);} else {connectionFuture = connectionManager.connectionWriteOp(source, command);}return connectionFuture;}?3.接著調用RedisConnection的send向channel寫入數據。
RedisConnection public <T, R> ChannelFuture send(CommandData<T, R> data) {return channel.writeAndFlush(data);}4.netty的inBoundHandler中有一個CommandsQueue,為一個命令同步隊列,同一時刻一個連接只有一個命令在執行,執行完后,再執行下一個命令。
org.redisson.client.handler.CommandsQueue private void sendData(Channel ch) {QueueCommandHolder command = queue.peek();if (command != null && command.trySend()) {QueueCommand data = command.getCommand();List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();if (!pubSubOps.isEmpty()) {for (CommandData<Object, Object> cd : pubSubOps) {for (Object channel : cd.getParams()) {ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);}}} else {ch.attr(CURRENT_COMMAND).set(data);}command.getChannelPromise().addListener(listener);ch.writeAndFlush(data, command.getChannelPromise());}}?
?
五、接收數據回調過程。
1.接收inhandler, 在收到數據后,從attr中的current_command屬性中取出數據。
CommandDecoder @Overrideprotected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {QueueCommand data = ctx.channel().attr(CommandsQueue.CURRENT_COMMAND).get();if (state() == null) {state(new State());}if (data == null) {while (in.writerIndex() > in.readerIndex()) {int endIndex = skipCommand(in);try {decode(ctx, in, data);} catch (Exception e) {in.readerIndex(endIndex);throw e;}}} else {int endIndex = 0;if (!(data instanceof CommandsData)) {endIndex = skipCommand(in);}try {decode(ctx, in, data);} catch (Exception e) {if (!(data instanceof CommandsData)) {in.readerIndex(endIndex);}throw e;}}}?
2.根據相應的PROMISE設置回調數據。
CommandDecoderprotected void completeResponse(CommandData<Object, Object> data, Object result) {if (data != null) {data.getPromise().trySuccess(result);}}3.在等待異步PROMISE結果。
CommandAsyncService @Overridepublic <V> V get(RFuture<V> future) {try {future.await();} catch (InterruptedException e) {Thread.currentThread().interrupt();}if (future.isSuccess()) {return future.getNow();}throw convertException(future);}?
總結
以上是生活随笔為你收集整理的redission收发命令流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HashedWheelTimer时间轮定
- 下一篇: Redisson 管道批量发送命令流程分