memcached源码解析--java客户端

分享到:
  来源:CSDN  发布时间:2013-12-11  浏览次数:3381

  memcached

 题记:

    自己一直在EE企业级混着,最近想转型网络应用和产品这块,就来学习下memcached客户端分布式缓存,memcached是用c写的简单缓存,应用socket来交换数据。

阿里的岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。

    开源了一个for java版的memcached客户端(http://code.google.com/p/memcache-client-forjava ),同时加入了LocalCached本地缓存的简单实现用于优化memcached,于是想学习下,希望对自己有所帮助。

 

引子——本地缓存

    wenchu写了个简单的本地缓存,目的是让应用在进行memcached查询之前,再做次缓存以提高效率。实现很简单,用ConcurrentHashMap来缓存,并设置了一个简单的超时机制,利用ScheduledExecutorService开启1个线程定时遍历所有map,并清理过期的数据。如下:

DefaultCacheImpl.java

 

[java] view plaincopy
 
  1. /** 
  2.      * 具体内容存放的地方 
  3.      */  
  4.     ConcurrentHashMap<String, Object>[] caches;  
  5.     /** 
  6.      * 超期信息存储,存储所有的key,超时时间 
  7.      */  
  8.     ConcurrentHashMap<String, Long> expiryCache;  
  9.       
  10.     /** 
  11.      * 清理超期内容的服务 
  12.      */  
  13.     private  ScheduledExecutorService scheduleService;  
  14.       
  15.     /** 
  16.      * 清理超期信息的时间间隔,默认10分钟 
  17.      */  
  18.     private int expiryInterval = 10;  
  19.       
  20.     /** 
  21.      * 内部cache的个数,根据key的hash对module取模来定位到具体的某一个内部的Map, 
  22.      * 减小阻塞情况发生。 
  23.      */  
  24.     private int moduleSize = 10;  
  25.       

 

注意:

/**
  * 内部cache的个数,根据key的hash对module取模来定位到具体的某一个内部的Map,
  * 减小阻塞情况发生。
  */

 

所以每次get和put的时候,需要取模计算用哪个map,如下:

 DefaultCacheImpl.java

 

[java] view plaincopy
 
  1. private ConcurrentHashMap<String, Object>getCache(String key)  
  2.     {  
  3.         long hashCode = (long)key.hashCode();  
  4.           
  5.         if (hashCode < 0)  
  6.             hashCode = -hashCode;  
  7.           
  8.         int moudleNum = (int)hashCode % moduleSize;  
  9.           
  10.         return caches[moudleNum];  
  11.     }  

 

如果换我实现,我肯定就一个ConcurrentHashMap了,向前辈学习:)

 

本地缓存的超时机制——能否优化?

    为了建立超时机制,作者除开用了Map数组来存储key-value的值,还需要存储超时时间,而expiryCache这个Map就是用来控制时间的,他存储key和ttl。所以在每次put的时候,expiryCache需要设定这对key-value的超时时间,而在每次get的时候需要判断取的key是否超时,如果这个key超时了,则返回空。

    这里每次都做了判断了,我想是不是能减去这个判断?

    DefaultCacheImpl.java

 

[java] view plaincopy
 
  1. // 判断函数,判断某key-value是否超时  
  2. private void checkValidate(String key)  
  3.     {  
  4.         if (key != null && expiryCache.get(key) != null && expiryCache.get(key) != -1   
  5.                 && new Date(expiryCache.get(key)).before(new Date()))  
  6.         {  
  7.             getCache(key).remove(key);  
  8.             expiryCache.remove(key);  
  9.         }  
  10.     }  
  11.   
  12.   
  13. // get函数  
  14. public Object get(String key)  
  15.     {  
  16.         checkValidate(key);// 每次都check了  
  17.         return getCache(key).get(key);  
  18.     }  

 

 

  从另一个角度说,get如果不每次检查有效性是有可能获取到“超时信息”的,所以为了保证一定取到有效数据。

    如果去掉“定时清理”线程,只留get来做清理工作的话,那么又会存在某个从来没有get过key-value永远存在内存里。我觉得如果性能有要求的话,是可以去掉清理线程的。

 

 

上层应用对集群的封装——MemcachedCache.java

    此类是面向使用者的封装类,封装了底层memcachedClient的操作,他对外提供了clear(),put(),get(),remove(),add(),replace()等等需要用到的方法。对于上层应用来说,并不需要关心是本地缓存还是远程获取值,并且也不用关心与某个memcached服务器的通讯细节。

1,clear

    如果做了集群,则调用所有集群进行flushAll()

    可以看看flushAll方法:

 

[java] view plaincopy
 
  1. public boolean clear()  
  2.  {  
  3.   boolean result = false;  
  4.     
  5.   if (helper.hasCluster())  
  6.   {  
  7.    List<MemCachedClient> caches = helper.getClusterCache();  
  8.      
  9.    for(MemCachedClient cache : caches)  
  10.    {  
  11.     try  
  12.     {  
  13.      result = cache.flushAll(null);  
  14.     }  
  15.     catch(Exception ex)  
  16.     {  
  17.      Logger.error(new StringBuilder(helper.getCacheName())  
  18.         .append(" cluster clear error"),ex);  
  19.      result = false;  
  20.     }  
  21.    }  
  22.      
  23.    return result;  
  24.      
  25.   }  
  26.   else  
  27.    return helper.getInnerCacheClient().flushAll( null );  
  28.  }  

 

 

2,put

   如果集群,则构造命令入命令队列,并异步发送给所有集群执行;

   如果非集群,则只发送给某memcached执行命令;

 

 

 

[java] view plaincopy
 
  1. public Object put(String key, Object value, Date expiry)  
  2.     {  
  3.   
  4.         boolean result = getCacheClient(key).set(key,value,expiry);  
  5.           
  6.         //移除本地缓存的内容  
  7.         if (result)  
  8.             localCache.remove(key);  
  9.           
  10.         if (helper.hasCluster())  
  11.         {  
  12.             Object[] commands = new Object[]{CacheCommand.PUT,key,value,expiry};  
  13.               
  14.             addCommandToQueue(commands);// 构造的命令加入命令队列,让异步线程发送给所有集群memcached服务器  
  15.         }  
  16.         else  
  17.             if (!result)  
  18.                 throw new java.lang.RuntimeException  
  19.                     (new StringBuilder().append("put key :").append(key).append(" error!").toString());  
  20.           
  21.         return value;  
  22.     }  

 

....其他几个方法都是大同小异了。

 

真正的底层实现——MemCachedClient.java

 

 

底层Socket通讯模块——SockIOPool.java

    首先这个多io池采用多例模式,自己管理自己的实例并对外提供。

 

 

[java] view plaincopy
 
  1. // store instances of pools  
  2.     private static ConcurrentMap<String, SockIOPool> pools = new ConcurrentHashMap<String, SockIOPool>();  

 

而且pools管理的是多个连接池,每个SockIOPool管理着多个Sock,key是socket对象,value是socket状态,如下:

 

[java] view plaincopy
 
  1. private ConcurrentMap<String, ConcurrentMap<SockIO, Integer>> socketPool;  

 

 可以看到这里的SockIO类是一个静态内部类,主要工作就是管理一个Socket连接(TCP),给Memcached服务器发送命令并接收答复。

  比如包括,新建立一个TCP连接(初始化host,port,timeout等),发送命令,接收答复。

 

[java] view plaincopy
 
  1. public static class SockIO  
  2.     {  
  3.         // pool  
  4.         private SockIOPool pool;  
  5.   
  6.         // data  
  7.         private String host;  
  8.         private Socket sock;  
  9.   
  10.         private DataInputStream in;  
  11.         private BufferedOutputStream out;  
  12.           
  13.         private byte[] recBuf;  
  14.           
  15.         private int recBufSize = 1028;  
  16.         private int recIndex = 0;  
  17.         //判断是否需要检查链接处于可用状态  
  18.         private long aliveTimeStamp = 0;  
  19.   
  20.         public SockIO(SockIOPool pool, String host, int port, int timeout,  
  21.                 int connectTimeout, boolean noDelay) throws IOException,  
  22.                 UnknownHostException  
  23.         {  
  24.   
  25.             this.pool = pool;  
  26.               
  27.             recBuf = new byte[recBufSize];  
  28.   
  29.             // get a socket channel  
  30.             sock = getSocket(host, port, connectTimeout);  
  31.   
  32.             if (timeout >= 0)  
  33.                 sock.setSoTimeout(timeout);  
  34.   
  35.             // testing only  
  36.             sock.setTcpNoDelay(noDelay);  
  37.   
  38.             // wrap streams  
  39.             in = new DataInputStream(sock.getInputStream());  
  40.             out = new BufferedOutputStream(sock.getOutputStream());  
  41.   
  42.             this.host = host + ":" + port;  
  43.         }  
  44. ....  
  45. }  

 

发送命令:

 

 

[java] view plaincopy
 
  1. void flush() throws IOException  
  2. {  
  3.     if (sock == null || !sock.isConnected())  
  4.     {  
  5.         log.error("++++ attempting to write to closed socket");  
  6.         throw new IOException(  
  7.                 "++++ attempting to write to closed socket");  
  8.     }  
  9.     out.flush();  
  10. }  
  11.   
  12.   
  13. void write(byte[] b) throws IOException  
  14. {  
  15.     if (sock == null || !sock.isConnected())  
  16.     {  
  17.         log.error("++++ attempting to write to closed socket");  
  18.         throw new IOException(  
  19.                 "++++ attempting to write to closed socket");  
  20.     }  
  21.     out.write(b);  
  22. }  

 

接收答复:

 

[java] view plaincopy
 
  1. public String readLine() throws IOException  
  2. {  
  3.     if (sock == null || !sock.isConnected())  
  4.     {  
  5.         throw new IOException("++++ attempting to read from closed socket");  
  6.     }  
  7.   
  8.     String result = null;  
  9.     ByteArrayOutputStream bos = new ByteArrayOutputStream();  
  10.     //StringBuilder content = new StringBuilder();  
  11.     int readCount = 0;  
  12.       
  13.     // some recbuf releave  
  14.     if (recIndex > 0 && read(bos))  
  15.     {  
  16.         return bos.toString();  
  17.     }  
  18.       
  19.     while((readCount = in.read(recBuf,recIndex,recBuf.length - recIndex)) > 0)  
  20.     {  
  21.         recIndex = recIndex + readCount;  
  22.           
  23.         if (read(bos))  
  24.             break;        
  25.     }  
  26.               
  27.     result = bos.toString();  
  28.       
  29.     if (result == null || (result != null && result.length() <= 0 && recIndex <= 0))  
  30.     {  
  31.         throw new IOException("++++ Stream appears to be dead, so closing it down");  
  32.     }  
  33.       
  34.     //update alive state  
  35.     aliveTimeStamp = System.currentTimeMillis();  
  36.   
  37.     return result;  
  38.   
  39. }  

 

关于底层的socket通讯作者已经通过SockIO这个内部静态类已经写出来了,下面的工作就是Pool如何管理这些socket,并下达命令。

 

看上面MemcachedCache.java中的put方法

public Object put(String key, Object value, Date expiry)
{
  boolean result = getCacheClient(key).set(key,value,expiry);
...}

 

然后根据key的hashcode与集群大小取模操作,选择某个memcached服务器组:

 

public MemCachedClient getCacheClient(String key)
 {
  if (cacheClient == null)
  {
   Logger.error("cacheClient can't be injected into MemcachedCacheHelper");
   throw new java.lang.RuntimeException("cacheClient can't be injected into MemcachedCacheHelper");
  }
  
   if (hasCluster())
  {
   List<MemCachedClient> clusters = getClusterCache();
   
   long keyhash = key.hashCode();
   
   int index = (int)keyhash % clusters.size();
   
   if (index < 0 )
    index *= -1;
   
   return clusters.get(index);
   
  }
  else
   return cacheClient;
 }

 

取到Client后,调用MemCachedClient的set方法:

public boolean set( String key, Object value, Date expiry ) {
  return set( "set", key, value, expiry, null, primitiveAsString );
 }

 

[java] view plaincopy
 
  1. private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {  
  2.   
  3.         // 参数合法性检查  
  4.         try {  
  5.             key = sanitizeKey( key );// 是否进行UTF-8编码  
  6.         }  
  7.         catch ( UnsupportedEncodingException e ) {  
  8.         // 省略  
  9.         }  
  10.   
  11.         if ( value == null ) {  
  12.             log.error( "trying to store a null value to cache" );  
  13.             return false;  
  14.         }  
  15.   
  16.         // 通过key找到相应的sock  
  17.         SockIOPool.SockIO sock = pool.getSock( key, hashCode );  
  18.           
  19.         if ( sock == null ) {  
  20.             if ( errorHandler != null )  
  21.                 errorHandler.handleErrorOnSet( thisnew IOException( "no socket to server available" ), key );  
  22.             return false;  
  23.         }  
  24.           
  25.         if ( expiry == null )  
  26.             expiry = new Date(0);  
  27.   
  28.         // store flags  
  29.         int flags = 0;  
  30.           
  31.         // byte array to hold data  
  32.         byte[] val;  
  33.   
  34.         if ( NativeHandler.isHandled( value ) ) {  
  35.         // 略 对value的值进行非序列化,根据value的类型转换成byte     
  36.         }  
  37.         else {  
  38.             // 略 always serialize for non-primitive types 对value对象进行序列化  
  39.         }  
  40.           
  41.         // now try to compress if we want to  
  42.         // and if the length is over the threshold   
  43.         if ( compressEnable && val.length > compressThreshold ) {  
  44.                                           // 略 对value进行压缩工作  
  45.         }  
  46.   
  47.         // now write the data to the cache server  
  48.         try {  
  49.             String cmd = new StringBuilder().append(cmdname).append(" ")  
  50.                             .append(key).append(" ").append(flags).append(" ")  
  51.                                 .append(expiry.getTime() / 1000).append(" ")  
  52.                                     .append(val.length).append("/r/n").toString();  
  53.                   
  54.             sock.write( cmd.getBytes() );         
  55.               
  56.             sock.write( val );  
  57.             sock.write(B_RETURN);  
  58.             sock.flush();  
  59.   
  60.             // get result code  
  61.             String line = sock.readLine();  
  62.               
  63.             if (log.isInfoEnabled())  
  64.                 log.info( new StringBuilder().append("++++ memcache cmd (result code): ").append(cmd)  
  65.                         .append(" (").append(line).append(")").toString() );  
  66.   
  67.             if ( STORED.equals( line ) ) {  
  68.                   
  69.                 if (log.isInfoEnabled())  
  70.                     log.info(new StringBuilder().append("++++ data successfully stored for key: ").append(key).toString() );  
  71.                   
  72.                 sock.close();  
  73.                 sock = null;  
  74.                 return true;  
  75.             }  
  76.             else if ( NOTSTORED.equals( line ) )   
  77.             {  
  78.                 if (log.isInfoEnabled())  
  79.                     log.info( new StringBuilder().append("++++ data not stored in cache for key: ").append(key).toString() );  
  80.             }  
  81.             else {  
  82.                   
  83.                 log.error( new StringBuilder().append("++++ error storing data in cache for key: ")  
  84.                         .append(key).append(" -- length: ").append(val.length).toString() );  
  85.                 log.error( new StringBuilder().append("++++ server response: ").append(line).toString() );  
  86.             }  
  87.         }  
  88.         catch ( IOException e ) {  
  89.   
  90.             // if we have an errorHandler, use its hook  
  91.             if ( errorHandler != null )  
  92.                 errorHandler.handleErrorOnSet( this, e, key );  
  93.   
  94.             // exception thrown  
  95.             log.error( "++++ exception thrown while writing bytes to server on set" );  
  96.             log.error( e.getMessage(), e );  
  97.   
  98.             try {  
  99.                 sock.trueClose();  
  100.             }  
  101.             catch ( IOException ioe ) {  
  102.                 log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );  
  103.             }  
  104.   
  105.             sock = null;  
  106.         }  
  107.   
  108.         if ( sock != null ) {  
  109.             sock.close();  
  110.             sock = null;  
  111.         }  
  112.   
  113.         return false;  
  114.     }  

 

 

 可以看到Memcached类主要做了一个本地化缓存和选择集群调用相应MemcachedClient的工作,而MemcachedClient做了数据准备的工作,包括编码的转换,对象的序列化,压缩。

    前期数据的准备工作做好之后,就需要存储数据了,这里就会遇到选择哪个服务器的问题。

    而SockIOPool类主要做了策略选择某个Server,并提供此Server的SockIO工作。作者是通过key和hashCode的计算选定某个Server,然后通过SockIO sock = getConnection(server);从这个server池中取得一个有效的SockIO(甚至是重新创建一个新的Socket),如果所有socket都无效的话,则重新计算hashcode,选择一个新的Server来做,如此遍历下去直到找到一个有效的SockIO。

 

 

     SockIOPool初始化的时候建立一个名为“default”的连接池,然后为Server[]数组中每个Server创建SockIO池,保持长连接(因为本地端口资源有限,需要限制最大连接池数),通过createSocket方法,这个方法首先判断需要创建的host是否在失败过,如果失败过则根据条件重新创建,并设置状态为SOCKET_STATUS_ACTIVE,也就是sockets = new ConcurrentHashMap<SockIO, SOCKET_STATUS_ACTIVE>();  pool.putIfAbsent(host, sockets);,如果创建失败,则入失败的Map,这个Map的作用避免每次为失败过的Server进行重新创建,到这时,所有的Server就创建好了。

所以实际上最重要的两个方法是getSock和getConnection,getSock是根据key来计算并选择某个Server,getConnection则根据某个Server来从池中获取某个SockIO。

上层类MemCachedClient不关心底层选择的是哪个Server,只要拿到SockIO后,便可以进行操作了。

 

 

异步数据同步

    异步数据同步是通过ExecutorService开启线程,读取队列中的命令集并向集群中所有memcached服务器发送命令来实现数据同步。 

 

 对于多个memcache服务器,要做到负载平衡是通过以下代码来选择服务器的

 

 

[java] view plaincopy
 
  1. public final SchoonerSockIO getSock(String key, Integer hashCode) {  
  2.   
  3.         if (!this.initialized) {  
  4.             log.error("attempting to get SockIO from uninitialized pool!");  
  5.             return null;  
  6.         }  
  7.   
  8.         // if no servers return null  
  9.         int size = 0;  
  10. //如果没有bucket返回空  
  11.         if ((this.hashingAlg == CONSISTENT_HASH && consistentBuckets.size() == 0)  
  12.                 || (buckets != null && (size = buckets.size()) == 0))  
  13.             return null;  
  14. //如果有一个bucket则直接返回或者生成唯一的sock的  
  15.         else if (size == 1) {  
  16.             SchoonerSockIO sock = (this.hashingAlg == CONSISTENT_HASH) ? getConnection(consistentBuckets  
  17.                     .get(consistentBuckets.firstKey())) : getConnection(buckets.get(0));  
  18.   
  19.             return sock;  
  20.         }  
  21.   
  22.         // from here on, we are working w/ multiple servers  
  23.         // keep trying different servers until we find one  
  24.         // making sure we only try each server one time  
  25.         Set<String> tryServers = new HashSet<String>(Arrays.asList(servers));  
  26.         // get initial bucket  
  27. //通过getBucket的算法返回bucket的位置  
  28.         long bucket = getBucket(key, hashCode);  
  29. //返回相对应的server就是host  
  30.         String server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets  
  31.                 .get((int) bucket);  
  32. //一直遍历找到或者生成server对应soct 只要还有剩下的服务   
  33.         while (!tryServers.isEmpty()) {  
  34.             // try to get socket from bucket  
  35.             SchoonerSockIO sock = getConnection(server);  
  36.             if (sock != null)  
  37.                 return sock;  
  38.   
  39.             // if we do not want to failover, then bail here  
  40.             if (!failover)  
  41.                 return null;  
  42.             // log that we tried  
  43.             tryServers.remove(server);  
  44.             if (tryServers.isEmpty())  
  45.                 break;  
  46.             // if we failed to get a socket from this server  
  47.             // then we try again by adding an incrementer to the  
  48.             // current key and then rehashing  
  49.             int rehashTries = 0;  
  50.             while (!tryServers.contains(server)) {  
  51.                 String newKey = new StringBuffer().append(rehashTries).append(key).toString();  
  52.                 // String.format( "%s%s", rehashTries, key );  
  53.                 bucket = getBucket(newKey, null);  
  54.                 server = (this.hashingAlg == CONSISTENT_HASH) ? consistentBuckets.get(bucket) : buckets  
  55.                         .get((int) bucket);  
  56.                 rehashTries++;  
  57.             }  
  58.         }  
  59.         return null;  
  60.     }  
  61.    

 

 

对于对象数据的存储 首先数据应该实序列化的 否则会报异常  如果存储形式是基础类型的值  回来cmd 中保存类型的对应的标识 直接通过命令行把 key value值发送过去

如果是对象类型

通过 ObjectOutputStream   ObjectInputStream 进行数据的传输

保存到服务器的数据会被压缩  客户端得到的数据要解压

 

来源:

http://blog.csdn.net/ma309385560/article/details/6307659

 

 

知识文章分类

Java

阅读排行