杭州.net培训
达内杭州.net培训中心

13732203138

热门课程

如何在.NETCore实现RedisClient

  • 时间:2018-05-17
  • 发布:杭州.NET培训
  • 来源:企业笔试题

要想自行实现redisClient,则必须先要了解Redis的socket能信协议。新版统一请求协议在Redis 1.2版本中引入,并最终在Redis 2.0版本成为Redis服务器通信的标准方式。在这个协议中,所有发送至Redis服务器的参数都是二进制安全(binary safe)的。

以下是这个协议的一般形式

*<参数数量> CR LF

$<参数1的字节数量> CR LF

<参数1的数据> CR LF

...

$<参数N的字节数量> CR LF

<参数N的数据> CR LF

注:命令本身也作为协议的其中一个参数来发送。举个例子,以下是一个命令协议的打印版本:

*3

$3

SET

$5

mykey

$7

myvalue

这个命令的实际协议值如下:

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

稍后看到,这种格式除了用作命令请求协议之外,也用在命令的回复协议中:这种只有一个参数的回复格式被称为批量回复(Bulk Reply)。

统一协议请求原本是用在回复协议中,用于将列表的多个项返回给客户端的,这种回复格式被称为多条批量回复(Multi Bulk Reply)。一个多条批量回复以*<argc>\r\n为前缀,后跟多条不同的批量回复,其中argc为这些批量回复的数量。

Redis命令会返回多种不同类型的回复。一个状态回复(或者单行回复,single line reply)是一段以"+"开始、"\r\n"结尾的单行字符串。通过检查服务器发回数据的第一个字节,可以确定这个回复是什么类型:

·        状态回复(status reply)的第一个字节是"+"

·        错误回复(error reply)的第一个字节是"-"

·        整数回复(integer reply)的第一个字节是":"

·        批量回复(bulk reply)的第一个字节是"$"

·        多条批量回复(multi bulk reply)的第一个字节是"*"

二 .NET Core Socket

说起socket,就不得不说IOCP了,这个方案本身就是为了解决多连接、高并发而设计的;但是话又说回来,任何方案都有局限性,不可能解决所有问题;这里不去讨论用在这里是否合适,反正本人就是想这么试一把:用一个简单的ioc模式实现SAEA.Socket,并为此设定各种场景,反过来优化SAEA.Socket本身。下面是一段服务器接收连接的代码:

private void ProcessAccept(SocketAsyncEventArgs args)

{

if (args == null)

{

args = new SocketAsyncEventArgs();

args.Completed += ProcessAccepted;

}

else

{

args.AcceptSocket = null;

}

if (!_listener.AcceptAsync(args))

{

ProcessAccepted(_listener, args);

}

}

项目结构

在网上找到redis的命令文档后,本人觉的准备工作差不多了,可以初步定一下项目结构:

·        Core:定义的是Redisclient相关最基本的业务

·        Interface:定义的是一些需要抽象出来的接口

·        Model:定义的是redis的数据模型及其请求、回复的类型枚举

·        Net:这里就是将继承实现SAEA.Socket而来的RedisConnection通信基础

三 命令解码器

通过前面的准备工作了解到redisClient的关键在于命令的编解码,至于高大上算法或redis官方算法的实现,本人没有去详细了解,一冲动就自行实现了自定义版的解码器。

public string Coder(RequestType commandName, params string[] @params)

{

_autoResetEvent.WaitOne();

_commandName = commandName;

var sb = new StringBuilder();

sb.AppendLine("*" + @params.Length);

foreach (var param in @params)

{

sb.AppendLine("$" + param.Length);

sb.AppendLine(param);

}

return sb.ToString();

}

public ResponseData Decoder()

{

var result = new ResponseData();

string command = ull;

string error = null;

var len = 0;

switch (_commandName)

{

case RequestType.PING:

command = BlockDequeue();

if (GetStatus(command, out error))

{

result.Type = ResponseType.OK;

result.Data = "PONG";

}

else

{

result.Type = ResponseType.Error;

result.Data = error;

}

break;

case RequestType.AUTH:

case RequestType.SELECT:

case RequestType.SLAVEOF:

case RequestType.SET:

case RequestType.DEL:

case RequestType.HSET:

case RequestType.HDEL:

case RequestType.LSET:

command = BlockDequeue();

if (GetStatus(command, out error))

{

result.Type = ResponseType.OK;

result.Data = "OK";

}

else

{

result.Type = ResponseType.Error;

result.Data = error;

}

break;

case RequestType.TYPE:

command = BlockDequeue();

if (GetStatusString(command, out string msg))

{

result.Type = ResponseType.OK;

}

else

{

result.Type = ResponseType.Error;

}

result.Data = msg;

break;

case RequestType.GET:

case RequestType.GETSET:

case RequestType.HGET:

case RequestType.LPOP:

case RequestType.RPOP:

case RequestType.SRANDMEMBER:

case RequestType.SPOP:

len = GetWordsNum(BlockDequeue(), out error);

if (len == -1)

{

result.Type = ResponseType.Empty;

result.Data = error;

}

else

{

result.Type = ResponseType.String;

result.Data += BlockDequeue();

}

break;

case RequestType.KEYS:

case RequestType.HKEYS:

case RequestType.LRANGE:

case RequestType.SMEMBERS:

result.Type = ResponseType.Lines;

var sb = new StringBuilder();

var rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

//再尝试读取一次,发现有回车行出现

if (rn == -1) rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (rn > 0)

{

for (int i = 0; i < rn; i++)

{

len = GetWordsNum(BlockDequeue(), out error);

sb.AppendLine(BlockDequeue());

}

}

result.Data = sb.ToString();

break;

case RequestType.HGETALL:

case RequestType.ZRANGE:

case RequestType.ZREVRANGE:

result.Type = ResponseType.KeyValues;

sb = new StringBuilder();

rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (rn > 0)

{

for (int i = 0; i < rn; i++)

{

len = GetWordsNum(BlockDequeue(), out error);

sb.AppendLine(BlockDequeue());

}

}

result.Data = sb.ToString();

break;

case RequestType.DBSIZE:

case RequestType.EXISTS:

case RequestType.EXPIRE:

case RequestType.PERSIST:

case RequestType.SETNX:

case RequestType.HEXISTS:

case RequestType.HLEN:

case RequestType.LLEN:

case RequestType.LPUSH:

case RequestType.RPUSH:

case RequestType.LREM:

case RequestType.SADD:

case RequestType.SCARD:

case RequestType.SISMEMBER:

case RequestType.SREM:

case RequestType.ZADD:

case RequestType.ZCARD:

case RequestType.ZCOUNT:

case RequestType.ZREM:

case RequestType.PUBLISH:

var val = GetValue(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (val == 0)

{

result.Type = ResponseType.Empty;

}

else

{

result.Type = ResponseType.OK;

}

result.Data = val.ToString();

break;

case RequestType.INFO:

var rnum = GetWordsNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

var info = "";

while (info.Length < rnum)

{

info += BlockDequeue();

}

result.Type = ResponseType.String;

result.Data = info;

break;

case RequestType.SUBSCRIBE:

var r = "";

while (IsSubed)

{

r = BlockDequeue();

if (r == "message\r\n")

{

result.Type = ResponseType.Sub;

BlockDequeue();

result.Data = BlockDequeue();

BlockDequeue();

result.Data += BlockDequeue();

break;

}

}

break;

case RequestType.UNSUBSCRIBE:

var rNum = GetRowNum(BlockDequeue(), out error);

var wNum = GetWordsNum(BlockDequeue(), out error);

BlockDequeue();

wNum = GetWordsNum(BlockDequeue(), out error);

var channel = BlockDequeue();

var vNum = GetValue(BlockDequeue(), out error);

IsSubed = false;

break;

}

_autoResetEvent.Set();

return result;

}

命令的封装与测试

有了socket、redisCoder之后,现在就可以按照官方的redis命令来进行.net core的封装了。

本人将这些操作封装到RedisClient、RedisDataBase两个类中,然后又想到连接复用的问题,简单实现了一个连接池RedisClientFactory的类。这样一来就可以好好的来实验一把,看看之前的设想最终能不能实现了:

/****************************************************************************

*Copyright (c) 2018 Microsoft All Rights Reserved.

*CLR版本:4.0.30319.42000

*机器名称:WENLI-PC

*公司名称:Microsoft

*命名空间:SAEA.RedisSocketTest

*文件名:Program

*版本号:V1.0.0.0

*唯一标识:3d4f939c-3fb9-40e9-a0e0-c7ec773539ae

*当前的用户域:WENLI-PC

*创建人:yswenli

*电子邮箱:wenguoli_520@qq.com

*创建时间:2018/3/17 10:37:15

*描述:

*

*=====================================================================

*修改标记

*修改时间:2018/3/19 10:37:15

*修改人:yswenli

*版本号:V1.0.0.0

*描述:

*

*****************************************************************************/

using SAEA.Commom;

using SAEA.RedisSocket;

using System;

namespace SAEA.RedisSocketTest

{

class Program

{

static void Main(string[] args)

{

ConsoleHelper.Title = "SAEA.RedisSocketTest";

ConsoleHelper.WriteLine("输入ip:port连接RedisServer");

var ipPort = ConsoleHelper.ReadLine();

if (string.IsNullOrEmpty(ipPort))

{

ipPort = "127.0.0.1:6379";

}

RedisClient redisClient = new RedisClient(ipPort);

redisClient.Connect(); 

//redisClient.Connect("wenli"); 

var info = redisClient.Info();

if (info.Contains("NOAUTH Authentication required."))

{

while (true)

{

ConsoleHelper.WriteLine("请输入redis连接密码");

var auth = ConsoleHelper.ReadLine();

if (string.IsNullOrEmpty(auth))

{

auth = "yswenli";

}

var a = redisClient.Auth(auth);

if (a.Contains("OK"))

{

break;

}

else

{

ConsoleHelper.WriteLine(a);

}

}

}

//redisConnection.SlaveOf();

//redisConnection.Ping();

redisClient.Select(1);

//ConsoleHelper.WriteLine(redisConnection.Type("key0"));

ConsoleHelper.WriteLine("dbSize:{0}", redisClient.DBSize().ToString());

RedisOperationTest(redisClient, true);

ConsoleHelper.ReadLine();

}

private static void RedisOperationTest(object sender, bool status)

{

RedisClient redisClient = (RedisClient)sender;

if (status)

{

ConsoleHelper.WriteLine("连接redis服务器成功!");

#region key value

ConsoleHelper.WriteLine("回车开始kv插值操作...");

ConsoleHelper.ReadLine();

for (int i = 0; i < 1000; i++)

{

redisClient.GetDataBase().Set("key" + i, "val" + i);

}

//redisConnection.GetDataBase().Exists("key0");

ConsoleHelper.WriteLine("kv插入完成...");

ConsoleHelper.WriteLine("回车开始获取kv值操作...");

ConsoleHelper.ReadLine();

var keys = redisClient.GetDataBase().Keys().Data.ToArray(false, "\r\n");

foreach (var key in keys)

{

var val = redisClient.GetDataBase().Get(key);

ConsoleHelper.WriteLine("Get val:" + val);

}

ConsoleHelper.WriteLine("获取kv值完成...");

ConsoleHelper.WriteLine("回车开始开始kv移除操作...");

ConsoleHelper.ReadLine();

foreach (var key in keys)

{

redisClient.GetDataBase().Del(key);

}

ConsoleHelper.WriteLine("移除kv值完成...");

#endregion

#region hashset

string hid = "wenli";

ConsoleHelper.WriteLine("回车开始HashSet插值操作...");

ConsoleHelper.ReadLine();

for (int i = 0; i < 1000; i++)

{

redisClient.GetDataBase().HSet(hid, "key" + i, "val" + i);

}

ConsoleHelper.WriteLine("HashSet插值完成...");

ConsoleHelper.WriteLine("回车开始HashSet插值操作...");

ConsoleHelper.ReadLine();

var hkeys = redisClient.GetDataBase().GetHKeys(hid).Data.ToArray();

foreach (var hkey in hkeys)

{

var val = redisClient.GetDataBase().HGet(hid, hkey);

ConsoleHelper.WriteLine("HGet val:" + val.Data);

}

var hall = redisClient.GetDataBase().HGetAll("wenli");

ConsoleHelper.WriteLine("HashSet查询完成...");

ConsoleHelper.WriteLine("回车开始HashSet移除操作...");

ConsoleHelper.ReadLine();

foreach (var hkey in hkeys)

{

redisClient.GetDataBase().HDel(hid, hkey);

}

ConsoleHelper.WriteLine("HashSet移除完成...");

#endregion

//redisConnection.GetDataBase().Suscribe((c, m) =>

//{

//    ConsoleHelper.WriteLine("channel:{0} msg:{1}", c, m);

//    redisConnection.GetDataBase().UNSUBSCRIBE(c);

//}, "c39654");

ConsoleHelper.WriteLine("测试完成!");

}

else

{

ConsoleHelper.WriteLine("连接失败!");

}

}

}

}

上一篇:随时随地编写 .NET应用程序
下一篇:Core中的并发编程
选择城市和中心
贵州省

广西省

海南省