无锁编程(高并发情况下怎样尽量实现无锁编程)
大家好,今天我将向大家分享有关无锁编程和高并发情况下怎样尽量实现无锁编程的一些独特见解,希望能够为你们带来新的思考和启示。
并发编程解惑之线程
主要内容:
进程是资源分配的最小单位,每个进程都有独立的代码和数据空间,一个进程包含 1到 n个线程。线程是 CPU调度的最小单位,每个线程有独立的运行栈和程序计数器,线程切换开销小。
Java程序总是从主类的 main方法开始执行,main方法就是 Java程序默认的主线程,而在 main方法中再创建的线程就是其他线程。在 Java中,每次程序启动至少启动 2个线程。一个是 main线程,一个是垃圾收集线程。每次使用 Java命令启动一个 Java程序,就相当于启动一个 JVM实例,而每个 JVM实例就是在操作系统中启动的一个进程。
多线程可以通过继承或实现接口的方式创建。
Thread类是 JDK中定义的用于控制线程对象的类,该类中封装了线程执行体 run()方法。需要强调的一点是,线程执行先后与创建顺序无关。
通过 Runnable方式创建线程相比通过继承 Thread类创建线程的优势是避免了单继承的局限性。若一个 boy类继承了 person类,boy类就无法通过继承 Thread类的方式来实现多线程。
使用 Runnable接口创建线程的过程:先是创建对象实例 MyRunnable,然后将对象 My Runnable作为 Thread构造方法的入参,来构造出线程。对于 new Thread(Runnable target)创建的使用同一入参目标对象的线程,可以共享该入参目标对象 MyRunnable的成员变量和方法,但 run()方法中的局部变量相互独立,互不干扰。
上面代码是 new了三个不同的 My Runnable对象,如果只想使用同一个对象,可以只 new一个 MyRunnable对象给三个 new Thread使用。
实现 Runnable接口比继承 Thread类所具有的优势:
线程有新建、可运行、阻塞、等待、定时等待、死亡 6种状态。一个具有生命的线程,总是处于这 6种状态之一。每个线程可以独立于其他线程运行,也可和其他线程协同运行。线程被创建后,调用 start()方法启动线程,该线程便从新建态进入就绪状态。
NEW状态(新建状态)实例化一个线程之后,并且这个线程没有开始执行,这个时候的状态就是 NEW状态:
RUNNABLE状态(就绪状态):
阻塞状态有 3种:
如果一个线程调用了一个对象的 wait方法,那么这个线程就会处于等待状态(waiting状态)直到另外一个线程调用这个对象的 notify或者 notifyAll方法后才会解除这个状态。
run()里的代码执行完毕后,线程进入终结状态(TERMINATED状态)。
线程状态有 6种:新建、可运行、阻塞、等待、定时等待、死亡。
我们看下 join方法的使用:
运行结果:
我们来看下 yield方法的使用:
运行结果:
线程与线程之间是无法直接通信的,A线程无法直接通知 B线程,Java中线程之间交换信息是通过共享的内存来实现的,控制共享资源的读写的访问,使得多个线程轮流执行对共享数据的操作,线程之间通信是通过对共享资源上锁或释放锁来实现的。线程排队轮流执行共享资源,这称为线程的同步。
Java提供了很多同步操作(也就是线程间的通信方式),同步可使用 synchronized关键字、Object类的 wait/notifyAll方法、ReentrantLock锁、无锁同步 CAS等方式来实现。
ReentrantLock是 JDK内置的一个锁对象,用于线程同步(线程通信),需要用户手动释放锁。
运行结果:
这表明同一时间段只能有 1个线程执行 work方法,因为 work方法里的代码需要获取到锁才能执行,这就实现了多个线程间的通信,线程 0获取锁,先执行,线程 1等待,线程 0释放锁,线程 1继续执行。
synchronized是一种语法级别的同步方式,称为内置锁。该锁会在代码执行完毕后由 JVM释放。
输出结果跟 ReentrantLock一样。
Java中的 Object类默认是所有类的父类,该类拥有 wait、 notify、notifyAll方法,其他对象会自动继承 Object类,可调用 Object类的这些方法实现线程间的通信。
除了可以通过锁的方式来实现通信,还可通过无锁的方式来实现,无锁同 CAS(Compare-and-Swap,比较和交换)的实现,需要有 3个操作数:内存地址 V,旧的预期值 A,即将要更新的目标值 B,当且仅当内存地址 V的值与预期值 A相等时,将内存地址 V的值修改为目标值 B,否则就什么都不做。
我们通过计算器的案例来演示无锁同步 CAS的实现方式,非线程安全的计数方式如下:
线程安全的计数方式如下:
运行结果:
线程安全累加的结果才是正确的,非线程安全会出现少计算值的情况。JDK 1.5开始,并发包里提供了原子操作的类,AtomicBoolean用原子方式更新的 boolean值,AtomicInteger用原子方式更新 int值,AtomicLong用原子方式更新 long值。 AtomicInteger和 AtomicLong还提供了用原子方式将当前值自增 1或自减 1的方法,在多线程程序中,诸如++i或 i++等运算不具有原子性,是不安全的线程操作之一。通常我们使用 synchronized将该操作变成一个原子操作,但 JVM为此种操作提供了原子操作的同步类 Atomic,使用 AtomicInteger做自增运算的性能是 ReentantLock的好几倍。
上面我们都是使用底层的方式实现线程间的通信的,但在实际的开发中,我们应该尽量远离底层结构,使用封装好的 API,例如 J.U.C包(java.util.concurrent,又称并发包)下的工具类 CountDownLath、CyclicBarrier、Semaphore,来实现线程通信,协调线程执行。
CountDownLatch能够实现线程之间的等待,CountDownLatch用于某一个线程等待若干个其他线程执行完任务之后,它才开始执行。
CountDownLatch类只提供了一个构造器:
CountDownLatch类中常用的 3个方法:
运行结果:
CyclicBarrier字面意思循环栅栏,通过它可以让一组线程等待至某个状态之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier可以被重复使用,所以有循环之意。
相比 CountDownLatch,CyclicBarrier可以被循环使用,而且如果遇到线程中断等情况时,可以利用 reset()方法,重置计数器,CyclicBarrier会比 CountDownLatch更加灵活。
CyclicBarrier提供 2个构造器:
上面的方法中,参数 parties指让多少个线程或者任务等待至 barrier状态;参数 barrierAction为当这些线程都达到 barrier状态时会执行的内容。
CyclicBarrier中最重要的方法 await方法,它有 2个重载版本。下面方法用来挂起当前线程,直至所有线程都到达 barrier状态再同时执行后续任务。
而下面的方法则是让这些线程等待至一定的时间,如果还有线程没有到达 barrier状态就直接让到达 barrier的线程执行任务。
运行结果:
CyclicBarrier用于一组线程互相等待至某个状态,然后这一组线程再同时执行,CountDownLatch是不能重用的,而 CyclicBarrier可以重用。
Semaphore类是一个计数信号量,它可以设定一个阈值,多个线程竞争获取许可信号,执行完任务后归还,超过阈值后,线程申请许可信号时将会被阻塞。Semaphore可以用来构建对象池,资源池,比如数据库连接池。
假如在服务器上运行着若干个客户端请求的线程。这些线程需要连接到同一数据库,但任一时刻只能获得一定数目的数据库连接。要怎样才能够有效地将这些固定数目的数据库连接分配给大量的线程呢?
给方法加同步锁,保证同一时刻只能有一个线程去调用此方法,其他所有线程排队等待,但若有 10个数据库连接,也只有一个能被使用,效率太低。另外一种方法,使用信号量,让信号量许可与数据库可用连接数为相同数量,10个数据库连接都能被使用,大大提高性能。
上面三个工具类是 J.U.C包的核心类,J.U.C包的全景图就比较复杂了:
J.U.C包(java.util.concurrent)中的高层类(Lock、同步器、阻塞队列、Executor、并发容器)依赖基础类(AQS、非阻塞数据结构、原子变量类),而基础类是通过 CAS和 volatile来实现的。我们尽量使用顶层的类,避免使用基础类 CAS和 volatile来协调线程的执行。J.U.C包其他的内容,在其他的篇章会有相应的讲解。
Future是一种异步执行的设计模式,类似 ajax异步请求,不需要同步等待返回结果,可继续执行代码。使 Runnable(无返回值不支持上报异常)或 Callable(有返回值支持上报异常)均可开启线程执行任务。但是如果需要异步获取线程的返回结果,就需要通过 Future来实现了。
Future是位于 java.util.concurrent包下的一个接口,Future接口封装了取消任务,获取任务结果的方法。
在 Java中,一般是通过继承 Thread类或者实现 Runnable接口来创建多线程, Runnable接口不能返回结果,JDK 1.5之后,Java提供了 Callable接口来封装子任务,Callable接口可以获取返回结果。我们使用线程池提交 Callable接口任务,将返回 Future接口添加进 ArrayList数组,最后遍历 FutureList,实现异步获取返回值。
运行结果:
上面就是异步线程执行的调用过程,实际开发中用得更多的是使用现成的异步框架来实现异步编程,如 RxJava,有兴趣的可以继续去了解,通常异步框架都是结合远程 HTTP调用 Retrofit框架来使用的,两者结合起来用,可以避免调用远程接口时,花费过多的时间在等待接口返回上。
线程封闭是通过本地线程 ThreadLocal来实现的,ThreadLocal是线程局部变量(local vari able),它为每个线程都提供一个变量值的副本,每个线程对该变量副本的修改相互不影响。
在 JVM虚拟机中,堆内存用于存储共享的数据(实例对象),也就是主内存。Thread Local.set()、ThreadLocal.get()方法直接在本地内存(工作内存)中写和读共享变量的副本,而不需要同步数据,不用像 synchronized那样保证数据可见性,修改主内存数据后还要同步更新到工作内存。
Myabatis、hibernate是通过 threadlocal来存储 session的,每一个线程都维护着一个 session,对线程独享的资源操作很方便,也避免了线程阻塞。
ThreadLocal类位于 Thread线程类内部,我们分析下它的源码:
ThreadLocal和 Synchonized都用于解决多线程并发访问的问题,访问多线程共享的资源时,Synchronized同步机制采用了以时间换空间的方式,提供一份变量让多个线程排队访问,而 ThreadLocal采用了以空间换时间的方式,提供每个线程一个变量,实现数据隔离。
ThreadLocal可用于数据库连接 Connection对象的隔离,使得每个请求线程都可以复用连接而又相互不影响。
在 Java里面,存在强引用、弱引用、软引用、虚引用。我们主要来了解下强引用和弱引用:
上面 a、b对实例 A、B都是强引用
而上面这种情况就不一样了,即使 b被置为 null,但是 c仍然持有对 C对象实例的引用,而间接的保持着对 b的强引用,所以 GC不会回收分配给 b的空间,导致 b无法回收也没有被使用,造成了内存泄漏。这时可以通过 c= null;来使得 c被回收,但也可以通过弱引用来达到同样目的:
从源码中可以看出 Entry里的 key对 ThreadLocal实例是弱引用:
Entry里的 key对 ThreadLocal实例是弱引用,将 key值置为 null,堆中的 ThreadLocal实例是可以被垃圾收集器(GC)回收的。但是 value却存在一条从 Current Thread过来的强引用链,只有当当前线程 Current Thread销毁时,value才能被回收。在 threadLocal被设为 null以及线程结束之前,Entry的键值对都不会被回收,出现内存泄漏。为了避免泄漏,在 ThreadLocalMap中的 set/get Entry方法里,会对 key为 null的情况进行判断,如果为 null的话,就会对 value置为 null。也可以通过 ThreadLocal的 remove方法(类似加锁和解锁,最后 remove一下,解锁对象的引用)直接清除,释放内存空间。
总结来说,利用 ThreadLocal来访问共享数据时,JVM通过设置 ThreadLocalMap的 Key为弱引用,来避免内存泄露,同时通过调用 remove、get、set方法的时候,回收弱引用(Key为 null的 Entry)。当使用 static ThreadLocal的时候(如上面的 Spring多数据源),static变量在类未加载的时候,它就已经加载,当线程结束的时候,static变量不一定会被回收,比起普通成员变量使用的时候才加载,static的生命周期变长了,若没有及时回收,容易产生内存泄漏。
使用线程池,可以重用存在的线程,减少对象创建、消亡的开销,可控制最大并发线程数,避免资源竞争过度,还能实现线程定时执行、单线程执行、固定线程数执行等功能。
Java把线程的调用封装成了一个 Executor接口,Executor接口中定义了一个 execute方法,用来提交线程的执行。Executor接口的子接口是 ExecutorService,负责管理线程的执行。通过 Executors类的静态方法可以初始化
ExecutorService线程池。Executors类的静态方法可创建不同类型的线程池:
但是,不建议使用 Executors去创建线程池,而是通过 ThreadPoolExecutor的方式,明确给出线程池的参数去创建,规避资源耗尽的风险。
如果使用 Executors去创建线程池:
最佳的实践是通过 ThreadPoolExecutor手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。通过线程池实现类 ThreadPoolExecutor可构造出线程池的,构造函数有下面几个重要的参数:
参数 1:corePoolSize
线程池核心线程数。
参数 2:workQueue
阻塞队列,用于保存执行任务的线程,有 4种阻塞队列可选:
参数 3:maximunPoolSize
线程池最大线程数。如果阻塞队列满了(有界的阻塞队列),来了一个新的任务,若线程池当前线程数小于最大线程数,则创建新的线程执行任务,否则交给饱和策略处理。如果是无界队列就不存在这种情况,任务都在无界队列里存储着。
参数 4:RejectedExecutionHandler
拒绝策略,当队列满了,而且线程达到了最大线程数后,对新任务采取的处理策略。
有 4种策略可选:
最后,还可以自定义处理策略。
参数 5:ThreadFactory
创建线程的工厂。
参数 6:keeyAliveTime
线程没有任务执行时最多保持多久时间终止。当线程池中的线程数大于 corePoolSize时,线程池中所有线程中的某一个线程的空闲时间若达到 keepAliveTime,则会终止,直到线程池中的线程数不超过 corePoolSize。但如果调用了 allowCoreThread TimeOut(boolean value)方法,线程池中的线程数就算不超过 corePoolSize,keepAlive Time参数也会起作用,直到线程池中的线程数量变为 0。
参数 7:TimeUnit
配合第 6个参数使用,表示存活时间的时间单位最佳的实践是通过 ThreadPoolExecutor手动地去创建线程池,选取合适的队列存储任务,并指定线程池线程大小。
运行结果:
线程池创建线程时,会将线程封装成工作线程 Worker,Worker在执行完任务后,还会不断的去获取队列里的任务来执行。Worker的加锁解锁机制是继承 AQS实现的。
我们来看下 Worker线程的运行过程:
总结来说,如果当前运行的线程数小于 corePoolSize线程数,则获取全局锁,然后创建新的线程来执行任务如果运行的线程数大于等于 corePoolSize线程数,则将任务加入阻塞队列 BlockingQueue如果阻塞队列已满,无法将任务加入 BlockingQueue,则获取全局所,再创建新的线程来执行任务
如果新创建线程后使得线程数超过了 maximumPoolSize线程数,则调用 Rejected ExecutionHandler.rejectedExecution()方法根据对应的拒绝策略处理任务。
CPU密集型任务,线程执行任务占用 CPU时间会比较长,应该配置相对少的线程数,避免过度争抢资源,可配置 N个 CPU+1个线程的线程池;但 IO密集型任务则由于需要等待 IO操作,线程经常处于等待状态,应该配置相对多的线程如 2*N个 CPU个线程,A线程阻塞后,B线程能马上执行,线程多竞争激烈,能饱和的执行任务。线程提交 SQL后等待数据库返回结果时间较长的情况,CPU空闲会较多,线程数应设置大些,让更多线程争取 CPU的调度。
高并发情况下怎样尽量实现无锁编程
一个在线2k的游戏,每秒钟并发都吓死人。传统的hibernate直接插库基本上是不可行的。我就一步步推导出一个无锁的数据库操作。
1.并发中如何无锁。
一个很简单的思路,把并发转化成为单线程。Java的Disruptor就是一个很好的例子。如果用java的concurrentCollection类去做,原理就是启动一个线程,跑一个Queue,并发的时候,任务压入Queue,线程轮训读取这个Queue,然后一个个顺序执行。
在这个设计模式下,任何并发都会变成了单线程操作,而且速度非常快。现在的node.js,或者比较普通的ARPG服务端都是这个设计,“大循环”架构。
这样,我们原来的系统就有了2个环境:并发环境+”大循环“环境
并发环境就是我们传统的有锁环境,性能低下。
”大循环“环境是我们使用Disruptor开辟出来的单线程无锁环境,性能强大。
2.”大循环“环境中如何提升处理性能。
一旦并发转成单线程,那么其中一个线程一旦出现性能问题,必然整个处理都会放慢。所以在单线程中的任何操作绝对不能涉及到IO处理。那数据库操作怎么办?
增加缓存。这个思路很简单,直接从内存读取,必然会快。至于写、更新操作,采用类似的思路,把操作提交给一个Queue,然后单独跑一个Thread去一个个获取插库。这样保证了“大循环”中不涉及到IO操作。
问题再次出现:
如果我们的游戏只有个大循环还容易解决,因为里面提供了完美的同步无锁。
但是实际上的游戏环境是并发和“大循环”并存的,即上文的2种环境。那么无论我们怎么设计,必然会发现在缓存这块上要出现锁。
3.并发与“大循环”如何共处,消除锁?
我们知道如果在“大循环”中要避免锁操作,那么就用“异步”,把操作交给线程处理。结合这2个特点,我稍微改下数据库架构。
原本的缓存层,必然会存在着锁,例如:
public TableCache
{
private HashMap<String, Object> caches= new ConcurrentHashMap<String, Object>();
}
这个结构是必然的了,保证了在并发的环境下能够准确的操作缓存。但是”大循环“却不能直接操作这个缓存进行修改,所以必须启动一个线程去更新缓存,例如:
private static final ExecutorService EXECUTOR= Executors.newSingleThreadExecutor();
EXECUTOR.execute(new LatencyProcessor(logs));
class LatencyProcessor implements Runnable
{
public void run()
{
//这里可以任意的去修改内存数据。采用了异步。
}
}
OK,看起来很漂亮。但是又有个问题出现了。在高速存取的过程中,非常有可能缓存还没有被更新,就被其他请求再次获取,得到了旧的数据。
4.如何保证并发环境下缓存数据的唯一正确?
我们知道,如果只有读操作,没有写操作,那么这个行为是不需要加锁的。
我使用这个技巧,在缓存的上层,再加一层缓存,成为”一级缓存“,原来的就自然成为”二级缓存“。有点像CPU了对不?
一级缓存只能被”大循环“修改,但是可以被并发、”大循环“同时获取,所以是不需要锁的。
当发生数据库变动,分2种情况:
1)并发环境下的数据库变动,我们是允许有锁的存在,所以直接操作二级缓存,没有问题。
2)”大循环“环境下数据库变动,首先我们把变动数据存储在一级缓存,然后交给异步修正二级缓存,修正后删除一级缓存。
这样,无论在哪个环境下读取数据,首先判断一级缓存,没有再判断二级缓存。
这个架构就保证了内存数据的绝对准确。
而且重要的是:我们有了一个高效的无锁空间,去实现我们任意的业务逻辑。
最后,还有一些小技巧提升性能。
1.既然我们的数据库操作已经被异步处理,那么某个时间,需要插库的数据可能很多,通过对表、主键、操作类型的排序,我们可以删除一些无效操作。例如:
a)同一个表同一个主键的多次UPdate,取最后一次。
b)同一个表同一个主键,只要出现Delete,前面所有操作无效。
2.既然我们要对操作排序,必然会存在一个根据时间排序,如何保证无锁呢?使用
private final static AtomicLong _seq= new AtomicLong(0);
即可保证无锁又全局唯一自增,作为时间序列。
关于无锁编程到此分享完毕,希望能帮助到您。