转 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic
2829 | 0 | 0
再谈自旋锁
执行函数:CAS(V,E,N)
//分配内存指定大小的内存 public native long allocateMemory(long bytes); //根据给定的内存地址address设置重新分配指定大小的内存 public native long reallocateMemory(long address, long bytes); //用于释放allocateMemory和reallocateMemory申请的内存 public native void freeMemory(long address); //将指定对象的给定offset偏移量内存块中的所有字节设置为固定值 public native void setMemory(Object o, long offset, long bytes, byte value); //设置给定内存地址的值 public native void putAddress(long address, long x); //获取指定内存地址的值 public native long getAddress(long address); //设置给定内存地址的long值 public native void putLong(long address, long x); //获取指定内存地址的long值 public native long getLong(long address); //设置或获取指定内存的byte值 public native byte getByte(long address); public native void putByte(long address, byte x); //其他基本数据类型(long,char,float,double,short等)的操作与putByte及getByte相同 //操作系统的内存页大小 public native int pageSize();
//传入一个对象的class并创建该实例对象,但不会调用构造方法 public native Object allocateInstance(Class cls) throws InstantiationException;
//获取字段f在实例对象中的偏移量 public native long objectFieldOffset(Field f); //静态属性的偏移量,用于在对应的Class对象中读写静态属性 public native long staticFieldOffset(Field f); //返回值就是f.getDeclaringClass() public native Object staticFieldBase(Field f); //获得给定对象偏移量上的int值,所谓的偏移量可以简单理解为指针指向该变量的内存地址, //通过偏移量便可得到该对象的变量,进行各种操作 public native int getInt(Object o, long offset); //设置给定对象上偏移量的int值 public native void putInt(Object o, long offset, int x); //获得给定对象偏移量上的引用类型的值 public native Object getObject(Object o, long offset); //设置给定对象偏移量上的引用类型的值 public native void putObject(Object o, long offset, Object x); //其他基本数据类型(long,char,byte,float,double)的操作与getInthe及putInt相同 //设置给定对象的int值,使用volatile语义,即设置后立马更新到内存对其他线程可见 public native void putIntVolatile(Object o, long offset, int x); //获得给定对象的指定偏移量offset的int值,使用volatile语义,总能获取到最新的int值。 public native int getIntVolatile(Object o, long offset); //其他基本数据类型(long,char,byte,float,double)的操作与putIntVolatile及getIntVolatile相同,引用类型putObjectVolatile也一样。 //与putIntVolatile一样,但要求被操作字段必须有volatile修饰 public native void putOrderedInt(Object o,long offset,int x);
public class UnSafeDemo { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { // 通过反射得到theUnsafe对应的Field对象 Field field = Unsafe.class.getDeclaredField("theUnsafe"); // 设置该Field为可访问 field.setAccessible(true); // 通过Field得到该Field对应的具体对象,传入null是因为该Field为static的 Unsafe unsafe = (Unsafe) field.get(null); System.out.println(unsafe); //通过allocateInstance直接创建对象 User user = (User) unsafe.allocateInstance(User.class); Class userClass = user.getClass(); Field name = userClass.getDeclaredField("name"); Field age = userClass.getDeclaredField("age"); Field id = userClass.getDeclaredField("id"); //获取实例变量name和age在对象内存中的偏移量并设置值 unsafe.putInt(user,unsafe.objectFieldOffset(age),18); unsafe.putObject(user,unsafe.objectFieldOffset(name),"android TV"); // 这里返回 User.class, Object staticBase = unsafe.staticFieldBase(id); System.out.println("staticBase:"+staticBase); //获取静态变量id的偏移量staticOffset long staticOffset = unsafe.staticFieldOffset(userClass.getDeclaredField("id")); //获取静态变量的值 System.out.println("设置前的ID:"+unsafe.getObject(staticBase,staticOffset)); //设置值 unsafe.putObject(staticBase,staticOffset,"SSSSSSSS"); //获取静态变量的值 System.out.println("设置前的ID:"+unsafe.getObject(staticBase,staticOffset)); //输出USER System.out.println("输出USER:"+user.toString()); long data = 1000; byte size = 1;//单位字节 //调用allocateMemory分配内存,并获取内存地址memoryAddress long memoryAddress = unsafe.allocateMemory(size); //直接往内存写入数据 unsafe.putAddress(memoryAddress, data); //获取指定内存地址的数据 long addrData=unsafe.getAddress(memoryAddress); System.out.println("addrData:"+addrData); /** * 输出结果: sun.misc.Unsafe@6f94fa3e staticBase:class geym.conc.ch4.atomic.User 设置前的ID:USER_ID 设置前的ID:SSSSSSSS 输出USER:User{name='android TV', age=18', id=SSSSSSSS'} addrData:1000 */ } } class User{ public User(){ System.out.println("user 构造方法被调用"); } private String name; private int age; private static String id="USER_ID"; @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age +'\'' + ", id=" + id +'\'' + '}'; } }
public static Unsafe getUnsafe() { Class cc = sun.reflect.Reflection.getCallerClass(2); if (cc.getClassLoader() != null) throw new SecurityException("Unsafe"); return theUnsafe; }
//获取数组第一个元素的偏移地址 public native int arrayBaseOffset(Class arrayClass); //数组中一个元素占据的内存空间,arrayBaseOffset与arrayIndexScale配合使用,可定位数组中每个元素在内存中的位置 public native int arrayIndexScale(Class arrayClass);
CAS 操作相关
CAS是一些CPU直接支持的指令,也就是我们前面分析的无锁操作,在Java中无锁操作CAS基于以下3个方法实现,在稍后讲解Atomic系列内部方法是基于下述方法的实现的。
//第一个参数o为给定对象,offset为对象内存的偏移量,通过这个偏移量迅速定位字段并设置或获取该字段的值, //expected表示期望值,x表示要设置的值,下面3个方法都通过CAS原子指令执行操作。 public final native boolean compareAndSwapObject(Object o, long offset,Object expected, Object x); public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x); public final native boolean compareAndSwapLong(Object o, long offset,long expected,long x);
//1.8新增,给定对象o,根据获取内存偏移量指向的字段,将其增加delta, //这是一个CAS操作过程,直到设置成功方能退出循环,返回旧值 public final int getAndAddInt(Object o, long offset, int delta) { int v; do { //获取内存中最新值 v = getIntVolatile(o, offset); //通过CAS操作 } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; } //1.8新增,方法作用同上,只不过这里操作的long类型数据 public final long getAndAddLong(Object o, long offset, long delta) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; } //1.8新增,给定对象o,根据获取内存偏移量对于字段,将其 设置为新值newValue, //这是一个CAS操作过程,直到设置成功方能退出循环,返回旧值 public final int getAndSetInt(Object o, long offset, int newValue) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, newValue)); return v; } // 1.8新增,同上,操作的是long类型 public final long getAndSetLong(Object o, long offset, long newValue) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, newValue)); return v; } //1.8新增,同上,操作的是引用类型数据 public final Object getAndSetObject(Object o, long offset, Object newValue) { Object v; do { v = getObjectVolatile(o, offset); } while (!compareAndSwapObject(o, offset, v, newValue)); return v; }
挂起与恢复
将一个线程进行挂起是通过park方法实现的,调用 park后,线程将一直阻塞直到超时或者中断等条件出现。unpark可以终止一个挂起的线程,使其恢复正常。Java对线程的挂起操作被封装在 LockSupport类中,LockSupport类中有各种版本pack方法,其底层实现最终还是使用Unsafe.park()方法和Unsafe.unpark()方法
//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。 public native void park(boolean isAbsolute, long time); //终止挂起的线程,恢复正常.java.util.concurrent包中挂起操作都是在LockSupport类实现的,其底层正是使用这两个方法, public native void unpark(Object thread);
//在该方法之前的所有读操作,一定在load屏障之前执行完成 public native void loadFence(); //在该方法之前的所有写操作,一定在store屏障之前执行完成 public native void storeFence(); //在该方法之前的所有读写操作,一定在full屏障之前执行完成,这个内存屏障相当于上面两个的合体功能 public native void fullFence();
//获取持有锁,已不建议使用 @Deprecated public native void monitorEnter(Object var1); //释放锁,已不建议使用 @Deprecated public native void monitorExit(Object var1); //尝试获取锁,已不建议使用 @Deprecated public native boolean tryMonitorEnter(Object var1); //获取本机内存的页数,这个值永远都是2的幂次方 public native int pageSize(); //告诉虚拟机定义了一个没有安全检查的类,默认情况下这个类加载器和保护域来着调用者类 public native Class defineClass(String name, byte[] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain); //加载一个匿名类 public native Class defineAnonymousClass(Class hostClass, byte[] data, Object[] cpPatches); //判断是否需要加载一个类 public native boolean shouldBeInitialized(Class<?> c); //确保类一定被加载 public native void ensureClassInitialized(Class<?> c)
AtomicLong:原子更新长整型
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // 获取指针类Unsafe private static final Unsafe unsafe = Unsafe.getUnsafe(); //下述变量value在AtomicInteger实例对象内的内存偏移量 private static final long valueOffset; static { try { //通过unsafe类的objectFieldOffset()方法,获取value变量在对象内存中的偏移 //通过该偏移量valueOffset,unsafe类的内部方法可以获取到变量value对其进行取值或赋值操作 valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //当前AtomicInteger封装的int变量value private volatile int value; public AtomicInteger(int initialValue) { value = initialValue; } public AtomicInteger() { } //获取当前最新值, public final int get() { return value; } //设置当前值,具备volatile效果,方法用final修饰是为了更进一步的保证线程安全。 public final void set(int newValue) { value = newValue; } //最终会设置成newValue,使用该方法后可能导致其他线程在之后的一小段时间内可以获取到旧值,有点类似于延迟加载 public final void lazySet(int newValue) { unsafe.putOrderedInt(this, valueOffset, newValue); } //设置新值并获取旧值,底层调用的是CAS操作即unsafe.compareAndSwapInt()方法 public final int getAndSet(int newValue) { return unsafe.getAndSetInt(this, valueOffset, newValue); } //如果当前值为expect,则设置为update(当前值指的是value变量) public final boolean compareAndSet(int expect, int update) { return unsafe.compareAndSwapInt(this, valueOffset, expect, update); } //当前值加1返回旧值,底层CAS操作 public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); } //当前值减1,返回旧值,底层CAS操作 public final int getAndDecrement() { return unsafe.getAndAddInt(this, valueOffset, -1); } //当前值增加delta,返回旧值,底层CAS操作 public final int getAndAdd(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta); } //当前值加1,返回新值,底层CAS操作 public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } //当前值减1,返回新值,底层CAS操作 public final int decrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, -1) - 1; } //当前值增加delta,返回新值,底层CAS操作 public final int addAndGet(int delta) { return unsafe.getAndAddInt(this, valueOffset, delta) + delta; } //省略一些不常用的方法.... }
//当前值加1,返回新值,底层CAS操作 public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; }
//Unsafe类中的getAndAddInt方法 public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
//JDK 1.7的源码,由for的死循环实现,并且直接在AtomicInteger实现该方法, //JDK1.8后,该方法实现已移动到Unsafe类中,直接调用getAndAddInt方法即可 public final int incrementAndGet() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return next; } }
public class AtomicIntegerDemo { //创建AtomicInteger,用于自增操作 static AtomicInteger i=new AtomicInteger(); public static class AddThread implements Runnable{ public void run(){ for(int k=0;k<10000;k++) i.incrementAndGet(); } } public static void main(String[] args) throws InterruptedException { Thread[] ts=new Thread[10]; //开启10条线程同时执行i的自增操作 for(int k=0;k<10;k++){ ts[k]=new Thread(new AddThread()); } //启动线程 for(int k=0;k<10;k++){ts[k].start();} for(int k=0;k<10;k++){ts[k].join();} System.out.println(i);//输出结果:100000 } }
public class AtomicReferenceDemo2 { public static AtomicReference<User> atomicUserRef = new AtomicReference<User>(); public static void main(String[] args) { User user = new User("zejian", 18); atomicUserRef.set(user); User updateUser = new User("Shine", 25); atomicUserRef.compareAndSet(user, updateUser); //执行结果:User{name='Shine', age=25} System.out.println(atomicUserRef.get().toString()); } static class User { public String name; private int age; public User(String name, int age) { this.name = name; this.age = age; } public String getName() { return name; } @Override public String toString() { return "User{" + "name='" + name + '\'' + ", age=" + age + '}'; } } }
public class AtomicReference<V> implements java.io.Serializable { private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicReference.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } //内部变量value,Unsafe类通过valueOffset内存偏移量即可获取该变量 private volatile V value; //CAS方法,间接调用unsafe.compareAndSwapObject(),它是一个 //实现了CAS操作的native方法 public final boolean compareAndSet(V expect, V update) { return unsafe.compareAndSwapObject(this, valueOffset, expect, update); } //设置并获取旧值 public final V getAndSet(V newValue) { return (V)unsafe.getAndSetObject(this, valueOffset, newValue); } //省略其他代码...... } //Unsafe类中的getAndSetObject方法,实际调用还是CAS操作 public final Object getAndSetObject(Object o, long offset, Object newValue) { Object v; do { v = getObjectVolatile(o, offset); } while (!compareAndSwapObject(o, offset, v, newValue)); return v; }
从源码看来,AtomicReference与AtomicInteger的实现原理基本是一样的,最终执行的还是Unsafe类,关于AtomicReference的其他方法也是一样的,如下
AtomicReferenceArray:原子更新引用类型数组里的元素
public class AtomicIntegerArrayDemo { static AtomicIntegerArray arr = new AtomicIntegerArray(10); public static class AddThread implements Runnable{ public void run(){ for(int k=0;k<10000;k++) //执行数组中元素自增操作,参数为index,即数组下标 arr.getAndIncrement(k%arr.length()); } } public static void main(String[] args) throws InterruptedException { Thread[] ts=new Thread[10]; //创建10条线程 for(int k=0;k<10;k++){ ts[k]=new Thread(new AddThread()); } //启动10条线程 for(int k=0;k<10;k++){ts[k].start();} for(int k=0;k<10;k++){ts[k].join();} //执行结果 //[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000] System.out.println(arr); } }
public class AtomicIntegerArray implements java.io.Serializable { //获取unsafe类的实例对象 private static final Unsafe unsafe = Unsafe.getUnsafe(); //获取数组的第一个元素内存起始地址 private static final int base = unsafe.arrayBaseOffset(int[].class); private static final int shift; //内部数组 private final int[] array; static { //获取数组中一个元素占据的内存空间 int scale = unsafe.arrayIndexScale(int[].class); //判断是否为2的次幂,一般为2的次幂否则抛异常 if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); // shift = 31 - Integer.numberOfLeadingZeros(scale); } private long checkedByteOffset(int i) { if (i < 0 || i >= array.length) throw new IndexOutOfBoundsException("index " + i); return byteOffset(i); } //计算数组中每个元素的的内存地址 private static long byteOffset(int i) { return ((long) i << shift) + base; } //省略其他代码...... }
每个数组元素的内存地址=起始地址+元素下标 * 每个元素所占用的内存空间
//计算数组中每个元素的的内存地址 private static long byteOffset(int i) { return ((long) i << shift) + base; }
shift = 31 - Integer.numberOfLeadingZeros(scale);
00000000 00000000 00000000 00000100
//第一个数组元素,index=0 , 其中base为起始地址,4代表int类型占用的字节数 address = base + 0 * 4 即address= base + 0 << 2 //第二个数组元素,index=1 address = base + 1 * 4 即address= base + 1 << 2 //第三个数组元素,index=2 address = base + 2 * 4 即address= base + 2 << 2 //........
address= base + i << shift
//执行自增操作,返回旧值,i是指数组元素下标 public final int getAndIncrement(int i) { return getAndAdd(i, 1); } //指定下标元素执行自增操作,并返回新值 public final int incrementAndGet(int i) { return getAndAdd(i, 1) + 1; } //指定下标元素执行自减操作,并返回新值 public final int decrementAndGet(int i) { return getAndAdd(i, -1) - 1; } //间接调用unsafe.getAndAddInt()方法 public final int getAndAdd(int i, int delta) { return unsafe.getAndAddInt(array, checkedByteOffset(i), delta); } //Unsafe类中的getAndAddInt方法,执行CAS操作 public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!compareAndSwapInt(o, offset, v, v + delta)); return v; }
AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
public class AtomicIntegerFieldUpdaterDemo { public static class Candidate{ int id; volatile int score; } public static class Game{ int id; volatile String name; public Game(int id, String name) { this.id = id; this.name = name; } @Override public String toString() { return "Game{" + "id=" + id + ", name='" + name + '\'' + '}'; } } static AtomicIntegerFieldUpdater<Candidate> atIntegerUpdater = AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score"); static AtomicReferenceFieldUpdater<Game,String> atRefUpdate = AtomicReferenceFieldUpdater.newUpdater(Game.class,String.class,"name"); //用于验证分数是否正确 public static AtomicInteger allScore=new AtomicInteger(0); public static void main(String[] args) throws InterruptedException { final Candidate stu=new Candidate(); Thread[] t=new Thread[10000]; //开启10000个线程 for(int i = 0 ; i < 10000 ; i++) { t[i]=new Thread() { public void run() { if(Math.random()>0.4){ atIntegerUpdater.incrementAndGet(stu); allScore.incrementAndGet(); } } }; t[i].start(); } for(int i = 0 ; i < 10000 ; i++) { t[i].join();} System.out.println("最终分数score="+stu.score); System.out.println("校验分数allScore="+allScore); //AtomicReferenceFieldUpdater 简单的使用 Game game = new Game(2,"zh"); atRefUpdate.compareAndSet(game,game.name,"JAVA-HHH"); System.out.println(game.toString()); /** * 输出结果: * 最终分数score=5976 校验分数allScore=5976 Game{id=2, name='JAVA-HHH'} */ } }
public abstract class AtomicIntegerFieldUpdater<T> { public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) { //实际实现类AtomicIntegerFieldUpdaterImpl return new AtomicIntegerFieldUpdaterImpl<U> (tclass, fieldName, Reflection.getCallerClass()); } }
private static class AtomicIntegerFieldUpdaterImpl<T> extends AtomicIntegerFieldUpdater<T> { private static final Unsafe unsafe = Unsafe.getUnsafe(); private final long offset;//内存偏移量 private final Class<T> tclass; private final Class<?> cclass; AtomicIntegerFieldUpdaterImpl(final Class<T> tclass, final String fieldName, final Class<?> caller) { final Field field;//要修改的字段 final int modifiers;//字段修饰符 try { field = AccessController.doPrivileged( new PrivilegedExceptionAction<Field>() { public Field run() throws NoSuchFieldException { return tclass.getDeclaredField(fieldName);//反射获取字段对象 } }); //获取字段修饰符 modifiers = field.getModifiers(); //对字段的访问权限进行检查,不在访问范围内抛异常 sun.reflect.misc.ReflectUtil.ensureMemberAccess( caller, tclass, null, modifiers); ClassLoader cl = tclass.getClassLoader(); ClassLoader ccl = caller.getClassLoader(); if ((ccl != null) && (ccl != cl) && ((cl == null) || !isAncestor(cl, ccl))) { sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass); } } catch (PrivilegedActionException pae) { throw new RuntimeException(pae.getException()); } catch (Exception ex) { throw new RuntimeException(ex); } Class<?> fieldt = field.getType(); //判断是否为int类型 if (fieldt != int.class) throw new IllegalArgumentException("Must be integer type"); //判断是否被volatile修饰 if (!Modifier.isVolatile(modifiers)) throw new IllegalArgumentException("Must be volatile type"); this.cclass = (Modifier.isProtected(modifiers) && caller != tclass) ? caller : null; this.tclass = tclass; //获取该字段的在对象内存的偏移量,通过内存偏移量可以获取或者修改该字段的值 offset = unsafe.objectFieldOffset(field); } }
public int incrementAndGet(T obj) { int prev, next; do { prev = get(obj); next = prev + 1; //CAS操作 } while (!compareAndSet(obj, prev, next)); return next; } //最终调用的还是unsafe.compareAndSwapInt()方法 public boolean compareAndSet(T obj, int expect, int update) { if (obj == null || obj.getClass() != tclass || cclass != null) fullCheck(obj); return unsafe.compareAndSwapInt(obj, offset, expect, update); }
假设这样一种场景,当第一个线程执行CAS(V,E,U)操作,在获取到当前变量V,准备修改为新值U前,另外两个线程已连续修改了两次变量V的值,使得该值又恢复为旧值,这样的话,我们就无法正确判断这个变量是否已被修改过,如下图
/** * Created by zejian on 2017/7/2. * Blog : http://blog.csdn.net/javazejian [原文地址,请尊重原创] */ public class ABADemo { static AtomicInteger atIn = new AtomicInteger(100); //初始化时需要传入一个初始值和初始时间 static AtomicStampedReference<Integer> atomicStampedR = new AtomicStampedReference<Integer>(200,0); static Thread t1 = new Thread(new Runnable() { @Override public void run() { //更新为200 atIn.compareAndSet(100, 200); //更新为100 atIn.compareAndSet(200, 100); } }); static Thread t2 = new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atIn.compareAndSet(100,500); System.out.println("flag:"+flag+",newValue:"+atIn); } }); static Thread t3 = new Thread(new Runnable() { @Override public void run() { int time=atomicStampedR.getStamp(); //更新为200 atomicStampedR.compareAndSet(100, 200,time,time+1); //更新为100 int time2=atomicStampedR.getStamp(); atomicStampedR.compareAndSet(200, 100,time2,time2+1); } }); static Thread t4 = new Thread(new Runnable() { @Override public void run() { int time = atomicStampedR.getStamp(); System.out.println("sleep 前 t4 time:"+time); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atomicStampedR.compareAndSet(100,500,time,time+1); System.out.println("flag:"+flag+",newValue:"+atomicStampedR.getReference()); } }); public static void main(String[] args) throws InterruptedException { t1.start(); t2.start(); t1.join(); t2.join(); t3.start(); t4.start(); /** * 输出结果: flag:true,newValue:500 sleep 前 t4 time:0 flag:false,newValue:200 */ } }
public class AtomicStampedReference<V> { //通过Pair内部类存储数据和时间戳 private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } //存储数值和时间的内部类 private volatile Pair<V> pair; //构造器,创建时需传入初始值和时间初始值 public AtomicStampedReference(V initialRef, int initialStamp) { pair = Pair.of(initialRef, initialStamp); } }
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
private boolean casPair(Pair<V> cmp, Pair<V> val) { return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); }
public class ABADemo { static AtomicMarkableReference<Integer> atMarkRef = new AtomicMarkableReference<Integer>(100,false); static Thread t5 = new Thread(new Runnable() { @Override public void run() { boolean mark=atMarkRef.isMarked(); System.out.println("mark:"+mark); //更新为200 System.out.println("t5 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 200,mark,!mark)); } }); static Thread t6 = new Thread(new Runnable() { @Override public void run() { boolean mark2=atMarkRef.isMarked(); System.out.println("mark2:"+mark2); System.out.println("t6 result:"+atMarkRef.compareAndSet(atMarkRef.getReference(), 100,mark2,!mark2)); } }); static Thread t7 = new Thread(new Runnable() { @Override public void run() { boolean mark=atMarkRef.isMarked(); System.out.println("sleep 前 t7 mark:"+mark); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } boolean flag=atMarkRef.compareAndSet(100,500,mark,!mark); System.out.println("flag:"+flag+",newValue:"+atMarkRef.getReference()); } }); public static void main(String[] args) throws InterruptedException { t5.start();t5.join(); t6.start();t6.join(); t7.start(); /** * 输出结果: mark:false t5 result:true mark2:true t6 result:true sleep 前 t5 mark:false flag:true,newValue:500 ---->成功了.....说明还是发生ABA问题 */ } }
public class SpinLock { private AtomicReference<Thread> sign =new AtomicReference<>(); public void lock(){ Thread current = Thread.currentThread(); while(!sign .compareAndSet(null, current)){ } } public void unlock (){ Thread current = Thread.currentThread(); sign .compareAndSet(current, null); } }
使用CAS原子操作作为底层实现,lock()方法将要更新的值设置为当前线程,并将预期值设置为null。unlock()函数将要更新的值设置为null,并预期值设置为当前线程。然后我们通过lock()和unlock来控制自旋锁的开启与关闭,注意这是一种非公平锁。事实上AtomicInteger(或者AtomicLong)原子类内部的CAS操作也是通过不断的自循环(while循环)实现,不过这种循环的结束条件是线程成功更新对于的值,但也是自旋锁的一种。
文章转自:http://blog.csdn.net/javazejian/article/details/72772470
0
小飞
19人已关注
领课教育 31991
9813
update 46988
4926
领课教育 17922
husheng 20936
请更新代码 41585
凯哥Java 2191
凯哥Java 2493
凯哥Java 1960