• 微信公众号:美女很有趣。 工作之余,放松一下,关注即送10G+美女照片!

【Flink内存管理】初步认识

开发技术 开发技术 4小时前 1次浏览

一、很多大数据工具是基于jvm,Flink使用自主内存管理,这两者有什么区别

  1.jvm存储数据密度低,它包含三个部分,对象头,实例对象,对齐填充;java高级语言,很多事不用人为去做,自动去处理,偏底层自己做,就叫低级语言;

       2.FullGC会极大的影响性能,尤其为了处理大数据而开了很大的内存空间的JVM,一次GC甚至会达到分钟级;

       3.OOM问题影响稳定性,JVM所有对象的大小大于JVM所分配的内存大小,发生内存溢出

       4.缓存未命中,CPU计算是从CPU缓存获取的,是以缓存line加载,是连续的,而Jvm对象在堆上存储不是连续的

 

二、内存模型 【在1.10版本做了重大改变】

  jobManage内存模型:

 

  taskManage内存模型:在配置文件中说明配置项 Flink内存,进程内存

  堆上内存,堆外内存

 

  在YarnClusterDescriptor类中: JobManagerProcessUtils

final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
			flinkConfiguration,
			JobManagerOptions.TOTAL_PROCESS_MEMORY);

  点进去找到

static JobManagerProcessSpec processSpecFromConfig(Configuration config) {
		return createMemoryProcessSpec(PROCESS_MEMORY_UTILS.memoryProcessSpecFromConfig(config));
	}

  点进配置 :判断不同配置进入不同的模式

public CommonProcessMemorySpec<FM> memoryProcessSpecFromConfig(Configuration config) {
		if (options.getRequiredFineGrainedOptions().stream().allMatch(config::contains)) {
			// all internal memory options are configured, use these to derive total Flink and process memory
			return deriveProcessSpecWithExplicitInternalMemory(config);
		} else if (config.contains(options.getTotalFlinkMemoryOption())) {
			// internal memory options are not configured, total Flink memory is configured,
			// derive from total flink memory
			return deriveProcessSpecWithTotalFlinkMemory(config);
		} else if (config.contains(options.getTotalProcessMemoryOption())) {
			// total Flink memory is not configured, total process memory is configured,
			// derive from total process memory
			return deriveProcessSpecWithTotalProcessMemory(config);
		}
		return failBecauseRequiredOptionsNotConfigured();

  这个地方就是给JobManage赋值

private static JobManagerFlinkMemory createJobManagerFlinkMemory(
			MemorySize jvmHeap,
			MemorySize offHeapMemory) {
		verifyJvmHeapSize(jvmHeap);
		return new JobManagerFlinkMemory(jvmHeap, offHeapMemory);
	}

  再看TaskManage内存分配

  有个申请资源的过程:ActiveResourceManage这个方法 requestNewWorker

       

final TaskExecutorProcessSpec taskExecutorProcessSpec =
				TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec);

  new了一个

		final TaskExecutorFlinkMemory flinkMemory = new TaskExecutorFlinkMemory(
			frameworkHeapMemorySize,
			frameworkOffHeapMemorySize,
			workerResourceSpec.getTaskHeapSize(),
			workerResourceSpec.getTaskOffHeapSize(),
			workerResourceSpec.getNetworkMemSize(),
			workerResourceSpec.getManagedMemSize());

 二、内存数据结构

      1.内存段,最小的内存分配单元,默认32k,既可以是堆上也可以是堆外,堆上是字节数组,堆外主要是网络缓冲区,提供了对二进制数据的读写。TypeInformation封装了很多。主要使用混合的MemorySegment,堆上堆外都能用。展示一个Tuple3<Integer,Double,Person>对象的序列化,存储很紧密,使用效率高了,会对对象序列化

      2.内存页,是内存段之上的数据访问视图,使用时无需关系内存段的细节

      3.buffer,task算子之间在网络数据传输使用的是buffer。使用buffer,申请和释放由flink自行管理,一个networkbuffer封装了一个memorySegment。

      4.buffer池,bufferPool管理buffer,申请,释放,销毁等。

三、内存管理器

     在1.10管理taskManage

     在1.10版本之后管理slot级别,allocate分配资源

 

四、网络传输的内存管理

  【Flink内存管理】初步认识

 

 每个图有他的上流下流,输入大门,输出是结果分区,每个task都有本地缓冲池,最终向网络缓冲池,网络缓冲池,可以不同节点通讯。

 

最后一个背压机制record,后面的task积压,层层传递到sourcetask,credit信用机制。网络是基于netty。


程序员灯塔
转载请注明原文链接:【Flink内存管理】初步认识
喜欢 (0)