Atomic包之FieldUpdater深度解析

- 7 mins

前言

Java 5 中由Doug Lea大神写的atomic classes 中引入了 Field Updater,本质上来说就是volatile 字段的包装器,下面我们看看该如何使用:

AtomicIntegerFieldUpdater

首先看看类的定义:

public abstract class AtomicIntegerFieldUpdater<T> {

    /**
    *  抽象方法,提供一个静态方法以便得到实例
    */
    @CallerSensitive
    public static <U> AtomicIntegerFieldUpdater<U> newUpdater(Class<U> tclass,
                                                              String fieldName) {
        return new AtomicIntegerFieldUpdaterImpl<U>
            (tclass, fieldName, Reflection.getCallerClass());
    }
}


private static final class AtomicIntegerFieldUpdaterImpl<T>
        extends AtomicIntegerFieldUpdater<T> {
        
    AtomicIntegerFieldUpdaterImpl(final Class<T> tclass,
                                      final String fieldName,
                                      final Class<?> caller) {
            final Field field;
            final int modifiers;
            try {
                field = AccessController.doPrivileged(
                    new PrivilegedExceptionAction<Field>() {
                        public Field run() throws NoSuchFieldException {
                            //字段不存在会抛异常
                            return tclass.getDeclaredField(fieldName);
                        }
                    });
                //检查访问级别,
                modifiers = field.getModifiers();
                sun.reflect.misc.ReflectUtil.ensureMemberAccess(
                    caller, tclass, null, modifiers);
                ClassLoader cl = tclass.getClassLoader();
                ClassLoader ccl = caller.getClassLoader();
                if ((ccl != null) && (ccl != cl) &&
                    ((cl == null) || !isAncestor(cl, ccl))) {
                    sun.reflect.misc.ReflectUtil.checkPackageAccess(tclass);
                }
            } catch (PrivilegedActionException pae) {
                throw new RuntimeException(pae.getException());
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }

            //必须是int
            if (field.getType() != int.class)
                throw new IllegalArgumentException("Must be integer type");
            //必须用volatile修饰
            if (!Modifier.isVolatile(modifiers))
                throw new IllegalArgumentException("Must be volatile type");

            this.cclass = (Modifier.isProtected(modifiers)) ? caller : tclass;
            this.tclass = tclass;
            //用Unsafe里的那一坨方法去原子更新
            this.offset = U.objectFieldOffset(field);
        }
}

那么有那些使用场景呢,我们可以看到Cassandra中有一些地方用到了,比如:

public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>{
    private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
    = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
    
    private volatile int failures = 0;

        public void get() throws WriteTimeoutException, WriteFailureException
    {
        long timeout = currentTimeout();

        boolean success;
        try
        {
            success = condition.await(timeout, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException ex)
        {
            throw new AssertionError(ex);
        }

        if (!success)
        {
            int blockedFor = totalBlockFor();
            int acks = ackCount();
            if (acks >= blockedFor)
                acks = blockedFor - 1;
            throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor);
        }

        //可以看到平常可以直接访问failures,而不用像AtomicInteger一样还需要调用get方法
        if (totalBlockFor() + failures > totalEndpoints())
        {
            throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
        }
    }

    @Override
    public void onFailure(InetAddress from, RequestFailureReason failureReason)
    {
        logger.trace("Got failure from {}", from);

        //当需要类似AtomicInteger一样的功能时,我们知道volatile执行++是非原子的
        int n = waitingFor(from)
                ? failuresUpdater.incrementAndGet(this)
                : failures;

        failureReasonByEndpoint.put(from, failureReason);

        if (totalBlockFor() + n > totalEndpoints())
            signal();
    }

}

小结

总体来看使用场景不多,有点类似AtomicInteger,但是可以使用比较方便,比如定义了一个变量x,你可以直接正常访问x,而如果使用了AtomicInteger的话,还是需要调用AtomicInteger#get方法,另外如果你又需要原子的get-set的话也可以满足使用场景;另外比如说你有个比较大的链表,内部每个节点都需要原子的get-set,那每个节点都要定义一个AtomicInteger,这样会消耗更多的内存,那么就可以定义个static的AtomicIntegerFieldUpdater。

AtomicReferenceFieldUpdater

和Integer差不多了,只不过这个是用来更新引用的,所以就不过多解读了,Java类库中BufferedInputStream就调用了这个类:

public
class BufferedInputStream extends FilterInputStream {

    protected volatile byte buf[];

    /*
    *  原子的更新内部数组,比如扩容、关闭时,
    */
    private static final
        AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
        AtomicReferenceFieldUpdater.newUpdater
        (BufferedInputStream.class,  byte[].class, "buf");


    public void close() throws IOException {
        byte[] buffer;
        while ( (buffer = buf) != null) {
            //放在一个循环中,如果CAS更新失败,那么就读取最新的buf引用,继续CAS更新
            if (bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if (input != null)
                    input.close();
                return;
            }
        }
    }
}

另外还有JDK1.7之前的ConcurrentLinkedQueue,也有了这个机制,但是可以1.7之后切换到了Unsafe的方案, 主要还是因为性能原因,FieldUpdater的方案需要使用反射API配合,而Unsafe不用,而且有些JVM会把Unsafe的调用内联,理论上看会快很多(Todo :待测试)

//JDK1.6
private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;

        private static final
            AtomicReferenceFieldUpdater<Node, Node>
            nextUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Node.class, "next");
        private static final
            AtomicReferenceFieldUpdater<Node, Object>
            itemUpdater =
            AtomicReferenceFieldUpdater.newUpdater
            (Node.class, Object.class, "item");
}
//JDK1.8
private static class Node<E> {
        volatile E item;
        volatile Node<E> next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }

        // Unsafe mechanics

        private static final sun.misc.Unsafe UNSAFE;
        private static final long itemOffset;
        private static final long nextOffset;

        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> k = Node.class;
                itemOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("item"));
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

总结

总的来说,JDK类库里面使用较多一点,大部分场景直接使用JUC下面类似ConcurrentHashMap这些就够了,真遇到性能不满足的场景,再根据Profile的结果针对性优化。

comments powered by Disqus
rss facebook twitter github youtube mail spotify lastfm instagram linkedin google google-plus pinterest medium vimeo stackoverflow reddit quora quora