• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

netty-池化内存-PoolArean

互联网 diligentman 6天前 7次浏览

PoolArena

管理PoolChunk,PoolSubpage,PoolChunkList的类

netty-池化内存-PoolArean

源码

属性

 static final int numTinySubpagePools = 512 >>> 4;  //tiny数组个数,16字节的整数倍

    final PooledByteBufAllocator parent;

    private final int maxOrder;//PoolChunk 二叉树最大层树
    final int pageSize; //PoolChunk每页大小 8192K
    final int pageShifts;//用来运算二叉树的高度 = log2(pageSize)
    final int chunkSize; //PoolChunk总内存
    final int subpageOverflowMask;
    final int numSmallSubpagePools;//samll数组个数
    final int directMemoryCacheAlignment;//内存对齐
    final int directMemoryCacheAlignmentMask;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;//管理PoolChunk 
    private final PoolChunkList<T> q025;//管理PoolChunk 
    private final PoolChunkList<T> q000;//管理PoolChunk 
    private final PoolChunkList<T> qInit;//管理PoolChunk 
    private final PoolChunkList<T> q075;//管理PoolChunk 
    private final PoolChunkList<T> q100;//管理PoolChunk 

    private final List<PoolChunkListMetric> chunkListMetrics;

    // Metrics for allocations and deallocations
    private long allocationsNormal;
    // We need to use the LongCounter here as this is not guarded via synchronized block.
    private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
    private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
    private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
    private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();

    private long deallocationsTiny;
    private long deallocationsSmall;
    private long deallocationsNormal;

    // We need to use the LongCounter here as this is not guarded via synchronized block.
    private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();

    // Number of thread caches backed by this arena.
    final AtomicInteger numThreadCaches = new AtomicInteger();

重要方法

//计算需要申请的容量
int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }

        if (reqCapacity >= chunkSize) {//比chunkSize大,只需要按cacheAlignment对齐就可以了
            return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
        }

        if (!isTiny(reqCapacity)) { // >= 512  大于512小chunkSize
            // Doubled

            int normalizedCapacity = reqCapacity;
            normalizedCapacity --; //减1的目的是为了防止本身就是2的N次,算出的结果比原结果大1倍
            normalizedCapacity |= normalizedCapacity >>>  1;  //int为32位,4次就可以搞定,右移一位,相当于最高位的1右移一位,总共处理2位
            normalizedCapacity |= normalizedCapacity >>>  2;//总共处理4位
            normalizedCapacity |= normalizedCapacity >>>  4;//总共处理8位
            normalizedCapacity |= normalizedCapacity >>>  8;//总共处理16位
            normalizedCapacity |= normalizedCapacity >>> 16;//总共处理32位
            normalizedCapacity ++;//+1 变成2的N次方

            if (normalizedCapacity < 0) { // 这里应该不可能为负的
                normalizedCapacity >>>= 1; //如果是负数,无符号右移一位,变成正数
            }
            assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

            return normalizedCapacity;
        }

        if (directMemoryCacheAlignment > 0) { //小于 512字节,如果有对齐,优先按对齐需求
            return alignCapacity(reqCapacity);
        }

        // Quantum-spaced
        if ((reqCapacity & 15) == 0) { //没有额外的对齐需求,按16位对齐
            return reqCapacity;
        }

        return (reqCapacity & ~15) + 16;//没有额外的对齐需求,按16位对齐
    }

内存分配

 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);//处理申请容量
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {//从缓存中直接分配
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//从缓存中直接分配
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
           // 缓存中分配失败,从数组中找到该申请容量对应的下标,每个容量大小都会有唯一的数组下标,不同的容量对应不同的元素
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;//存在next,说明已经该大小的容量已经被分配过一次了
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();//分配内存,得到内存地址
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);//将内存地址设置到ByteBuf
                    incTinySmallAllocation(tiny);//分配次数累加一次
                    return;
                }
            }
            synchronized (this) {//一次都未分配,或者分配过,但是被踢除了,比如空间满了
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);//分配大对象
        }
    }
    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
       //先看看是否有分配过
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // Add a new chunk.
      //重新向系统申请内存
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        long handle = c.allocate(normCapacity);
        assert handle > 0;
        c.initBuf(buf, handle, reqCapacity);
        qInit.add(c);
    }

分配大对象,大对象不需要运算,直接把整个申请的内存扔给ByteBuf使用

    private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
        PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);//大对象不池化
        activeBytesHuge.add(chunk.chunkSize());
        buf.initUnpooled(chunk, reqCapacity);
        allocationsHuge.increment();
    }

内存回收

void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
        if (chunk.unpooled) {
            int size = chunk.chunkSize();
            destroyChunk(chunk);
            activeBytesHuge.add(-size);//未池化的,只有大对象
            deallocationsHuge.increment();
        } else {
            SizeClass sizeClass = sizeClass(normCapacity);
           //添加到缓存中,方便下次分配,缓存有容量限制,放不下去,就忽略
            if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
                // cached so not free it.
                return;
            }

            freeChunk(chunk, handle, sizeClass);
        }
    }
    void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
        final boolean destroyChunk;
        synchronized (this) {
            switch (sizeClass) {
            case Normal:
                ++deallocationsNormal;
                break;
            case Small:
                ++deallocationsSmall;
                break;
            case Tiny:
                ++deallocationsTiny;
                break;
            default:
                throw new Error();
            }
            destroyChunk = !chunk.parent.free(chunk, handle);//通知PoolChunkList回收该PoolChunk。根据使用率转移到合适的PoolChunkList里面
        }
        if (destroyChunk) {//转移失败,说明使用率为0,等待系统回收
            // destroyChunk not need to be called while holding the synchronized lock.
            destroyChunk(chunk);
        }
    }

重新分配

应用场景,ByteBuf 扩容

    void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
        if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
            throw new IllegalArgumentException("newCapacity: " + newCapacity);
        }

        int oldCapacity = buf.length;
        if (oldCapacity == newCapacity) {
            return;
        }

        PoolChunk<T> oldChunk = buf.chunk;
        long oldHandle = buf.handle;
        T oldMemory = buf.memory;
        int oldOffset = buf.offset;
        int oldMaxLength = buf.maxLength;
        int readerIndex = buf.readerIndex();
        int writerIndex = buf.writerIndex();

        allocate(parent.threadCache(), buf, newCapacity);
        if (newCapacity > oldCapacity) {
            memoryCopy(
                    oldMemory, oldOffset,
                    buf.memory, buf.offset, oldCapacity);
        } else if (newCapacity < oldCapacity) {
            if (readerIndex < newCapacity) {
                if (writerIndex > newCapacity) {
                    writerIndex = newCapacity;
                }
                memoryCopy(
                        oldMemory, oldOffset + readerIndex,
                        buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
            } else {
                readerIndex = writerIndex = newCapacity;
            }
        }

        buf.setIndex(readerIndex, writerIndex);

        if (freeOldMemory) {
            free(oldChunk, oldHandle, oldMaxLength, buf.cache);
        }
    }


喜欢 (0)