泰晓科技 -- 聚焦 Linux - 追本溯源,见微知著!
网站地址:https://tinylab.org

泰晓RISC-V实验箱,转战RISC-V,开箱即用
请稍侯

eBPF 程序装载、翻译与运行过程详解

Wu Daemon 创作于 2021/05/19

By Wu Daemon of TinyLab.org 2020/2/20

前言

上一篇 我们讲述了 eBPF 框架,了解了 BPF 程序的组成,大致分析了 bpf 指令和 map 数据通信,本节来分析下调用流程。

BPF 程序格式为 ELF

加载 bpf 程序实质上是加载 ELF 格式文件,Linux 加载普通 ELF 格式的文件在通过 load_elf_binary 来实现,而 Linux 加载 bpf elf 其实在用户态实现的,使用的是开源的 libelf 库实现的,调用过程不太一样,而且只是把 ELF 格式的指令 dump 出来,接下来还需要 JIT 编译器翻译出机器汇编码才能执行,这个调用过程比 Linux 加载普通 ELF 格式文件简单。

libelf 库实现的各个 API 可参考如下 链接 以及我们专门为此准备的 libelf 开源库用法详解

ELF 格式的详解可参考社区创始人撰写的开源书籍 C 语言编程透视 和配套视频课程《360° 剖析 Linux ELF》

ELF 文件大体结构

ELF 文件大体结构如下所示,包含 ELF 头部,程序头表,各个段和程序段表:

ELF Header		 #程序头,有该文件的Magic number(参考man magic),类型等
Program Header Table	 #对可执行文件和共享库有效,它描述下面各个节(section)组成的段
Section1
Section2
Section3
.....
Program Section Table	#仅对可重定位目标文件和静态库有效,用于描述各个Section的重定位信息等。

BPF 程序 Section 解析:get_sec

samples/bpf/bpf_load.c 中通过 get_sec 函数调用 libelf 库的 API 获取 section 内容,其中第四个参数是传入段的名字,最后一个参数获得的是该段的数据:

static int get_sec(Elf *elf, int i, GElf_Ehdr *ehdr, char **shname,
		   GElf_Shdr *shdr, Elf_Data **data)
{
	Elf_Scn *scn;

	scn = elf_getscn(elf, i);  //从elf描述符获取按照节索引获取节接口
	if (!scn)
		return 1;

	if (gelf_getshdr(scn, shdr) != shdr) // 通过节结构复制节表头
		return 2;

	*shname = elf_strptr(elf, ehdr->e_shstrndx, shdr->sh_name); //	从指定的字符串表中通过偏移获取字符串
	if (!*shname || !shdr->sh_size)
		return 3;

	*data = elf_getdata(scn, 0);  //从节中获取节数据(经过了字节序的转换)
	if (!*data || elf_getdata(scn, *data) != NULL)
		return 4;

	return 0;
}

BPF 程序装载与解析:load_bpf_file

tracex4_user.c 通过 load_bpf_file 加载 .o 文件,我们来分析一下。

load_bpf_file 实质是调用 do_load_bpf_file,在这个函数里首先打开 .o 文件,do_load_bpf_file 会将输入的 .o 文件作为 ELF 格式文件,逐个 section 进行分析:

  • 如 section 的名字是特殊的(比如 ‘kprobe’),那么就会将这个 section 的内容作为 load_and_attach 的参数。
  • 如 section 的名字是 “license” 或 “version” 则保存 license 或 version。
  • 如 section 是 map 则解析出 map 段

samples/bpf/bpfload.c 中的相关代码如下:

static int do_load_bpf_file(const char *path, fixup_map_cb fixup_map)
{
	int fd, i, ret, maps_shndx = -1, strtabidx = -1;
	Elf *elf;
	GElf_Ehdr ehdr;
	GElf_Shdr shdr, shdr_prog;
	Elf_Data *data, *data_prog, *data_maps = NULL, *symbols = NULL;
	char *shname, *shname_prog;
	int nr_maps = 0;
	... ...
	fd = open(path, O_RDONLY, 0);  //打开elf文件
	if (fd < 0)
		return 1;

	elf = elf_begin(fd, ELF_C_READ, NULL);//获取elf描述符,使用‘读取’的方式
	... ...
	if (gelf_getehdr(elf, &ehdr) != &ehdr)	//获取elf文件头副本
		return 1;
	... ...
	/* scan over all elf sections to get license and map info */
	for (i = 1; i < ehdr.e_shnum; i++) {		       //遍历各个section
		if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))  // shname 为"section"的名字
			continue;

		if (0) /* helpful for llvm debugging */        //打印各个section 对应的数据保存在data->d_buf中
			printf("section %d:%s data %p size %zd link %d flags %d\n",
			i, shname, data->d_buf, data->d_size,
			shdr.sh_link, (int) shdr.sh_flags);

		if (strcmp(shname, "license") == 0) {	     //如果是"license"段
			processed_sec[i] = true;
			memcpy(license, data->d_buf, data->d_size); //把 data->d_buf 拷贝到license数组
		} else if (strcmp(shname, "version") == 0) { //如果是"version"段
			processed_sec[i] = true;
			if (data->d_size != sizeof(int)) {
				printf("invalid size of version section %zd\n",
				data->d_size);
				return 1;
		}
		memcpy(&kern_version, data->d_buf, sizeof(int));//把 data->d_buf 拷贝到kern_version变量
	} else if (strcmp(shname, "maps") == 0) {      //如果是map 段
		int j;

		maps_shndx = i;
		data_maps = data;
		for (j = 0; j < MAX_MAPS; j++)
			map_data[j].fd = -1;
	} else if (shdr.sh_type == SHT_SYMTAB) {
		strtabidx = shdr.sh_link;
		symbols = data;
	}

	... ...

	if (data_maps) {     //对map段的处理
		nr_maps = load_elf_maps_section(map_data, maps_shndx,elf, symbols, strtabidx);	 //获取map段内容
		if (nr_maps < 0) {
			printf("Error: Failed loading ELF maps (errno:%d):%s\n",
			nr_maps, strerror(-nr_maps));
			goto done;
		}
		if (load_maps(map_data, nr_maps, fixup_map))  //这里加载map
			goto done;
		map_data_count = nr_maps;

		processed_sec[maps_shndx] = true;
	}

	/* process all relo sections, and rewrite bpf insns for maps */
	for (i = 1; i < ehdr.e_shnum; i++) {  //遍历所有的重定向段,
		if (processed_sec[i])  ////flag 置位表示已经是处理了的段 ,跳过去
			continue;

		if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))
			continue;

		if (shdr.sh_type == SHT_REL) {
			struct bpf_insn *insns;

			/* locate prog sec that need map fixup (relocations) */
			if (get_sec(elf, shdr.sh_info, &ehdr, &shname_prog,
				&shdr_prog, &data_prog))  //该段保存到data_prog
				continue;

			if (shdr_prog.sh_type != SHT_PROGBITS ||
			!(shdr_prog.sh_flags & SHF_EXECINSTR))
				continue;

			insns = (struct bpf_insn *) data_prog->d_buf;  //得到bpf字节码对应的结构体
			processed_sec[i] = true; /* relo section */

			if (parse_relo_and_apply(data, symbols, &shdr, insns,
						map_data, nr_maps))
				continue;
		}
	}

	/* load programs */
	for (i = 1; i < ehdr.e_shnum; i++) {
		if (processed_sec[i])  //flag 置位表示已经是处理了的段 ,跳过去
			continue;

		if (get_sec(elf, i, &ehdr, &shname, &shdr, &data))
			continue;

		if (memcmp(shname, "kprobe/", 7) == 0 ||
			memcmp(shname, "kretprobe/", 10) == 0 ||
			memcmp(shname, "tracepoint/", 11) == 0 ||
			memcmp(shname, "raw_tracepoint/", 15) == 0 ||
			memcmp(shname, "xdp", 3) == 0 ||
			memcmp(shname, "perf_event", 10) == 0 ||
			memcmp(shname, "socket", 6) == 0 ||
			memcmp(shname, "cgroup/", 7) == 0 ||
			memcmp(shname, "sockops", 7) == 0 ||
			memcmp(shname, "sk_skb", 6) == 0 ||
			memcmp(shname, "sk_msg", 6) == 0) {
			ret = load_and_attach(shname, data->d_buf,
					data->d_size);  //事件类型  字节码 字节码大小
			if (ret != 0)
				goto done;
		}
	}

done:
	close(fd);
	return ret;
}

打开 ELF 调试 log,可以得到该 ELF 文件各个段的内容首地址,大小,属性等信息。

wu@ubuntu:~/linux/samples/bpf$ sudo ./tracex4
[sudo] password for wu:
section 1:.strtab data 0x556034a3d070 size 277 link 0 flags 0
section 3:kprobe/kmem_cache_free data 0x556034a3d5a0 size 72 link 0 flags 6
section 4:.relkprobe/kmem_cache_free data 0x556034a3d5f0 size 16 link 26 flags 0
section 5:kretprobe/kmem_cache_alloc_node data 0x556034a3d610 size 192 link 0 flags 6
section 6:.relkretprobe/kmem_cache_alloc_node data 0x556034a3d6e0 size 16 link 26 flags 0
section 7:maps data 0x556034a3d700 size 28 link 0 flags 3
section 8:license data 0x556034a3d730 size 4 link 0 flags 3
section 9:version data 0x556034a3d750 size 4 link 0 flags 3
section 10:.debug_str data 0x556034a3d770 size 489 link 0 flags 48
section 11:.debug_loc data 0x556034a3d970 size 336 link 0 flags 0
section 12:.rel.debug_loc data 0x556034a3dad0 size 80 link 26 flags 0
section 13:.debug_abbrev data 0x556034a3db30 size 257 link 0 flags 0
section 14:.debug_info data 0x556034a3dc40 size 886 link 0 flags 0
section 15:.rel.debug_info data 0x556034a3dfc0 size 1200 link 26 flags 0
section 16:.debug_ranges data 0x556034a3e480 size 48 link 0 flags 0
section 17:.rel.debug_ranges data 0x556034a3e4c0 size 64 link 26 flags 0
section 18:.BTF data 0x556034a3e510 size 1384 link 0 flags 0
section 19:.rel.BTF data 0x556034a3ea80 size 48 link 26 flags 0
section 20:.BTF.ext data 0x556034a3eac0 size 376 link 0 flags 0
section 21:.rel.BTF.ext data 0x556034a3ec40 size 320 link 26 flags 0
section 22:.eh_frame data 0x556034a3ed90 size 80 link 0 flags 2
section 23:.rel.eh_frame data 0x556034a3edf0 size 32 link 26 flags 0
section 24:.debug_line data 0x556034a3ee20 size 327 link 0 flags 0
section 25:.rel.debug_line data 0x556034a3ef70 size 32 link 26 flags 0
section 26:.symtab data 0x556034a3efa0 size 1704 link 1 flags 0

BPF 字节码加载过程

接下来调用 load_and_attach,第一个参数是 event,本例就是 “kprobe/” ,第二个参数是 bpf 字节码,第三个参数是字节码大小。

BPF 指令结构

bpf_insn 是一个结构体,代表一条 eBPF 指令,包含 5 个字段组成:

struct bpf_insn {
    __u8    code;	 /* opcode */
    __u8    dst_reg:4;	  /* dest register */
    __u8    src_reg:4;	  /* source register */
    __s16    off;	 /* signed offset */
    __s32    imm;	 /* signed immediate constant */
};

每一个 eBPF 程序都是由若干个 bpf 指令构成,就是一个一个 bpf_insn 数组,使用 bpf 系统调用将其载入内核。

把 BPF 字节码装载到内核空间

接着调用 bpf_load_program,填入的参数为程序类型 prog_type, 和虚拟机指令 insns_cnt 等。

如果判断 events 是 kprobe/kretprobe,那么填充 buf 为 debugfs 相关路径。打开该路径,然后调用 sys_perf_event_open ioctl 设置等,这个和 strace 追踪到的调用过程基本一致。

static int load_and_attach(const char *event, struct bpf_insn *prog, int size)
{
    bool is_socket = strncmp(event, "socket", 6) == 0;


    ......

    fd = bpf_load_program(prog_type, prog, insns_cnt, license, kern_version,
			    bpf_log_buf, BPF_LOG_BUF_SIZE);

    ......
	   if (is_kprobe || is_kretprobe) {
		bool need_normal_check = true;
		const char *event_prefix = "";

		if (is_kprobe)
			event += 7;
		else
			event += 10;

		if (*event == 0) {
			printf("event name cannot be empty\n");
			return -1;
		}

		if (isdigit(*event))
			return populate_prog_array(event, fd);

#ifdef __x86_64__
		if (strncmp(event, "sys_", 4) == 0) {
			snprintf(buf, sizeof(buf), "%c:__x64_%s __x64_%s",
				is_kprobe ? 'p' : 'r', event, event);
			err = write_kprobe_events(buf);
			if (err >= 0) {
				need_normal_check = false;
				event_prefix = "__x64_";
			}
		}
#endif
		if (need_normal_check) {
			snprintf(buf, sizeof(buf), "%c:%s %s",
				is_kprobe ? 'p' : 'r', event, event);
			err = write_kprobe_events(buf);
			if (err < 0) {
				printf("failed to create kprobe '%s' error '%s'\n",
				       event, strerror(errno));
				return -1;
			}
		}

		strcpy(buf, DEBUGFS);
		strcat(buf, "events/kprobes/");
		strcat(buf, event_prefix);
		strcat(buf, event);
		strcat(buf, "/id");
	}

	efd = open(buf, O_RDONLY, 0);
	if (efd < 0) {
		printf("failed to open event %s\n", event);
		return -1;
	}

	err = read(efd, buf, sizeof(buf));
	if (err < 0 || err >= sizeof(buf)) {
		printf("read from '%s' failed '%s'\n", event, strerror(errno));
		return -1;
	}

	close(efd);

	buf[err] = 0;
	id = atoi(buf);
	attr.config = id;

	efd = sys_perf_event_open(&attr, -1/*pid*/, 0/*cpu*/, -1/*group_fd*/, 0);
	... ...
	event_fd[prog_cnt - 1] = efd;
	err = ioctl(efd, PERF_EVENT_IOC_ENABLE, 0);
	... ...
	err = ioctl(efd, PERF_EVENT_IOC_SET_BPF, fd);
	... ...

	return 0;
}

其中 bpf_load_program 会通过 BPF_PROG_LOAD 系统调用,将字节码传入内核,返回一个文件描述符 fdattr->insns 就是下面这种 bpf 字节码:

code=BPF_ALU64|BPF_X|BPF_MOV, dst_reg=BPF_REG_6, src_reg=BPF_REG_1, off=0, imm=0

kernel/bpf/syscall.c 中定义的相应系统调用如下:

SYSCALL_DEFINE3(bpf, int, cmd, union bpf_attr __user *, uattr, unsigned int, size)
{
    ......
	case BPF_MAP_CREATE:
		err = map_create(&attr);
		break;
    case BPF_PROG_LOAD:
	err = bpf_prog_load(&attr);  //attr包含字节码
    ... ...
}

bpf_prog_load 真正的加载 bpf 字节码,首先从 bpf 字节码中获得 license,判断是不是 GPL license。

然后分配内核 bpf_prog 程序数据结构空间,将 bpf 虚拟机指令从用户空间拷贝到内核空间,把指令保存在 struct bpf_prog 结构体中。

然后运行 bpf_check 验证 bpf 指令在注入内核是否安全,比如检查栈是否会溢出,除数是否为零,否则不检测安不安全容易造成内核 panic 等严重问题,这一部分内容很多,就暂时不分析了。

把 BPF 字节码翻译为机器码

验证通过之后,核心调用是运行 bpf_prog_select_runtime 里的 do_jit 把 bpf 字节码转换成机器汇编码,最后运行 bpf_prog_kallsyms_add 将机器汇编码添加到 kallsyms,在 /proc/kallsyms 中会看到 bpf 程序的符号表:

static int bpf_prog_load(union bpf_attr *attr)
{
	enum bpf_prog_type type = attr->prog_type;
	struct bpf_prog *prog;
	int err;
	char license[128];
	bool is_gpl;
	... ...
	/* copy eBPF program license from user space */
	if (strncpy_from_user(license, u64_to_user_ptr(attr->license),
			sizeof(license) - 1) < 0)  //拷贝license  attr->license
		return -EFAULT;
	license[sizeof(license) - 1] = 0;  //最后一位设空字符

	/* eBPF programs must be GPL compatible to use GPL-ed functions */
	is_gpl = license_is_gpl_compatible(license);

	/* plain bpf_prog allocation */
	prog = bpf_prog_alloc(bpf_prog_size(attr->insn_cnt), GFP_USER); /* 分配内核 bpf_prog 程序数据结构空间 */
	if (!prog)
		return -ENOMEM;

	prog->expected_attach_type = attr->expected_attach_type;

	prog->aux->offload_requested = !!attr->prog_ifindex;

	err = security_bpf_prog_alloc(prog->aux);
	if (err)
		goto free_prog_nouncharge;

	err = bpf_prog_charge_memlock(prog);
	if (err)
		goto free_prog_sec;

	prog->len = attr->insn_cnt;

	err = -EFAULT;
	if (copy_from_user(prog->insns, u64_to_user_ptr(attr->insns),
			bpf_prog_insn_size(prog)) != 0)  //将若干指令从用户态拷贝到内核态
		goto free_prog;

	prog->orig_prog = NULL;
	prog->jited = 0;

	atomic_set(&prog->aux->refcnt, 1);
	prog->gpl_compatible = is_gpl ? 1 : 0;	//设置gpl_compatible字段

	... ...
	/* run eBPF verifier */
	err = bpf_check(&prog, attr);  //运行verifier 检查字节码安全性
	if (err < 0)
		goto free_used_maps;

	prog = bpf_prog_select_runtime(prog, &err); //这里调用do_jit 将bpf字节码转换成汇编码
	if (err < 0)
		goto free_used_maps;

	err = bpf_prog_alloc_id(prog);
	if (err)
		goto free_used_maps;

	bpf_prog_kallsyms_add(prog);  //添加kallsyms

	err = bpf_prog_new_fd(prog);
	if (err < 0)
		bpf_prog_put(prog);
	return err;
	... ...
}

运行 BPF 机器码

JIT 编译器将机器汇编码的首地址转换成一个函数指针,保存到 prog->bpf_func,再看看哪里调用 prog->bpf_func 这个函数指针的呢?

在 debugfs 中创建 kprobe events,执行 init_kprobe_trace 加载 BPF 字节码的时候就调用了 trace_kprobe_create 继而调用 kprobe_dispatcher,因为定义了 CONFIG_PERF_EVENTS 而后调用 kprobe_perf_func,相关代码如下:

static struct dyn_event_operations trace_kprobe_ops = {
	.create = trace_kprobe_create,
	.show = trace_kprobe_show,
	.is_busy = trace_kprobe_is_busy,
	.free = trace_kprobe_release,
	.match = trace_kprobe_match,
};

/* Make a tracefs interface for controlling probe points */
static __init int init_kprobe_trace(void)
{
	... ...

	ret = dyn_event_register(&trace_kprobe_ops);
	if (ret)
		return ret;

	if (register_module_notifier(&trace_kprobe_module_nb))
		return -EINVAL;

	d_tracer = tracing_init_dentry();
	if (IS_ERR(d_tracer))
		return 0;

	entry = tracefs_create_file("kprobe_events", 0644, d_tracer,
				NULL, &kprobe_events_ops);

	... ...
	return 0;
}
fs_initcall(init_kprobe_trace);

trace_kprobe_create
{

   ... ...
   kprobe_dispatcher
   ... ...

}

static int kprobe_dispatcher(struct kprobe *kp, struct pt_regs *regs)
{
	struct trace_kprobe *tk = container_of(kp, struct trace_kprobe, rp.kp);
	int ret = 0;

	raw_cpu_inc(*tk->nhit);

	if (trace_probe_test_flag(&tk->tp, TP_FLAG_TRACE))
		kprobe_trace_func(tk, regs);
	#ifdef CONFIG_PERF_EVENTS
	if (trace_probe_test_flag(&tk->tp, TP_FLAG_PROFILE))
		ret = kprobe_perf_func(tk, regs);
	#endif
	return ret;
}

kprobe_perf_func 会调用 trace_call_bpf,在这里会执行 bpf 程序。BPF_PROG_RUN_ARRAY_CHECK 是一个宏,其实质上执行 BPF_PROG_RUN 里的一个函数:

/* Kprobe profile handler */
static int
kprobe_perf_func(struct trace_kprobe *tk, struct pt_regs *regs)
{
	if (bpf_prog_array_valid(call)) {
		unsigned long orig_ip = instruction_pointer(regs);
		int ret;

		ret = trace_call_bpf(call, regs);

		/*
		* We need to check and see if we modified the pc of the
		* pt_regs, and if so return 1 so that we don't do the
		* single stepping.
		*/
		if (orig_ip != instruction_pointer(regs))
			return 1;
		if (!ret)
			return 0;
	}

	return 0;
}


/ * trace_call_bpf - invoke BPF program
 * @call: tracepoint event
 * @ctx: opaque context pointer
 *
 * kprobe handlers execute BPF programs via this helper.
 * Can be used from static tracepoints in the future.
 *
 * Return: BPF programs always return an integer which is interpreted by
 * kprobe handler as:
 * 0 - return from kprobe (event is filtered out)
 * 1 - store kprobe event into ring buffer
 * Other values are reserved and currently alias to 1
 */
unsigned int trace_call_bpf(struct trace_event_call *call, void *ctx)
{
	unsigned int ret;

	... ...
	ret = BPF_PROG_RUN_ARRAY_CHECK(call->prog_array, ctx, BPF_PROG_RUN);  //运行bpf程序

out:
	__this_cpu_dec(bpf_prog_active);
	preempt_enable();

	return ret;
}

其中的 trace_event_call 结构体定义了 bpf_prog_array,该结构体数组中包含了要执行的函数指针:

struct trace_event_call {
	struct list_head	list;
	struct trace_event_class *class;
	union {
		char			*name;
		/* Set TRACE_EVENT_FL_TRACEPOINT flag when using "tp" */
		struct tracepoint	*tp;
	};
	... ...
#ifdef CONFIG_PERF_EVENTS
	int				perf_refcount;
	struct hlist_head __percpu	*perf_events;
	struct bpf_prog_array __rcu	*prog_array;

	int	(*perf_perm)(struct trace_event_call *,
				struct perf_event *);
#endif
};

struct bpf_prog_array {
	struct rcu_head rcu;
	struct bpf_prog_array_item items[0];
};

struct bpf_prog_array_item {
	struct bpf_prog *prog;
	struct bpf_cgroup_storage *cgroup_storage[MAX_BPF_CGROUP_STORAGE_TYPE];
};

BPF_PROG_RUN_ARRAY_CHECKBPF_PROG_RUN 宏展开如下所示,实质是在 BPF_PROG_RUN 中调用 ret = (*(prog)->bpf_func)(ctx, (prog)->insnsi) 这个函数指针来执行 bpf 指令:

#define BPF_PROG_RUN_ARRAY_CHECK(array, ctx, func)	\
	__BPF_PROG_RUN_ARRAY(array, ctx, func, true)


#define __BPF_PROG_RUN_ARRAY(array, ctx, func, check_non_null)	\
	({						\
		struct bpf_prog_array_item *_item;	\
		struct bpf_prog *_prog; 		\
		struct bpf_prog_array *_array;		\
		u32 _ret = 1;				\
		preempt_disable();			\
		rcu_read_lock();			\
		_array = rcu_dereference(array);	\
		if (unlikely(check_non_null && !_array))\
			goto _out;			\
		_item = &_array->items[0];		\
		while ((_prog = READ_ONCE(_item->prog))) {		\
			bpf_cgroup_storage_set(_item->cgroup_storage);	\
			_ret &= func(_prog, ctx);	\
			_item++;			\
		}					\
_out:							\
		rcu_read_unlock();			\
		preempt_enable();			\
		_ret;					\
	 })



#define BPF_PROG_RUN(prog, ctx) ({				\
	u32 ret;						\
	cant_sleep();						\
	if (static_branch_unlikely(&bpf_stats_enabled_key)) {	\
		struct bpf_prog_stats *stats;			\
		u64 start = sched_clock();			\
		ret = (*(prog)->bpf_func)(ctx, (prog)->insnsi); \
		stats = this_cpu_ptr(prog->aux->stats); 	\
		u64_stats_update_begin(&stats->syncp);		\
		stats->cnt++;					\
		stats->nsecs += sched_clock() - start;		\
		u64_stats_update_end(&stats->syncp);		\
	} else {						\
		ret = (*(prog)->bpf_func)(ctx, (prog)->insnsi); \
	}							\
	ret; })

参考资料

  1. BPF Program Type


Read Album:

Read Related:

Read Latest: