FastDCS是一个使用 C++ 开发的轻量级的分布式计算系统,使用它可以解决海量数据的计算和需要分布式服务方面的问题。2013年FastDCS进行了代码重构,大幅度提升了系统的性能和可靠性,并将代码和相关设计文档全部开源。[源码下载地址]
从系统组成划分的角度来看,FastDCS包括了管理节点(Master)和工作节点(Worker) - 管理节点Master对整个计算集群的服务状态、任务分发、计算调度等服务进行管理。 - 工作节点Worker作为计算单元接受Master服务器的管理,完成整个计算集群的计算任务。
[注1]:此处的Master服务器是指Primary Master或Secondary master服务器。
FastDCS源代码通过GNU General Public License V3进行发布。
FastDCS是一个使用C++开发的轻量级的分布式计算系统,使用它可以解决海量数据的实时计算和需要分布式服务方面的问题。
从系统组成划分的角度来看,FastDCS包括了管理节点(Master)和工作节点(Worker)
FastDCS系统整体架构
如图1-3-1,系统处理流程有如下几个部分组成:
计算任务处理流程
如图1-3-2,计算任务处理流程有如下几个部分组成:
FastDCS通过同时运行多个管理节点Master来避免服务集群的单点故障问题,多个Master节点之间通过环形算法选举出Primary master节点对服务集群进行主控,其它的Secondary master节点承担一部分辅助管理,确保了服务集群的可靠性的同时也避免了单点过载的问题。
Master选举算法
如图1-4-1,FastDCS系统中所有Master节点按照前后顺序组成一个环新形结构,每个Master节点都保存了这个环形结构的排序表,因此每个Master节点都知道自己的所有后继Master节点。
Master选举算法说明
假设当Master(1)节点发现到Primary master不再工作时,它将启动一个召集选举的过程:
同样,在环选举算法中,也可能同时存在多个召集选举的过程。当在这个时刻环结构不变时,只是消息数量多一些,最后的结果也是一致的。
primary-secondary协议
如图1-4-2,FastDCS采用primary-secondary(也称 primary-backup)的中心化副本控制协议。 在FastDCS系统中,数据被分为由Primary master节点管理着的元数据和由Secondary master节点管理着的数据副本。
Lease机制
如图1-4-3,FastDCS采用Lease机制能够确保在服务器或网络异常等情况下,仍然保持分散在服务集群中的数据具有很强的一致性。
FastDCS中的数据结构定义文件在/src/server/tracker_protocol.proto文件中。
其中只有FdcsTask、KeyValuesPair和KeyValuePair三个数据结构需要开发者了解掌握,其他的数据结构用于FastDCS系统内部的处理,与开发者无关。
数据结构FdcsTask用于计算任务的调度和保存计算结果,为了让FdcsTask能够以一种通用的结构满足各种应用场景中数据传输的需要, FastDCS使用了可以包含多个KV键值对的变量来满足需求,同时使用Google Protocol Buffers对系统内部的数据结构进行定义以及序列化操作。
message FdcsTask {
// 任务ID
required string task_id = 1 [default = ""];
// 任务租约有效时间
optional int64 lease_time = 2 [default = 0];
// 多个KeyValuesPair数据结构
repeated KeyValuesPair key_values_pairs = 3;
}
// 为方便理解FdcsTask数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务FdcsTask变量内部的数据内容如下:
FdcsTask = {
task_id = id1,
key_values_pairs = [
{key='A', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
{key='B', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
{key='C', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
...
]
}
FastDCS中的计算任务数据结构FdcsTask有只有3个变量组成,分别是任务ID、任务租约有效时间、一对多KV键值对三个部分组成:
message KeyValuesPair {
optional bytes key = 1;
// value中以字节的方式保存了多个KeyValuePair数据结构
repeated bytes value = 2;
}
// 为方便理解KeyValuesPair数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务KeyValuesPair变量内部的数据内容如下:
key_values_pairs = {key='A', value=[{key=单词1, value=count1},{key=单词11, value=count11}, ...]}
message KeyValuePair {
optional bytes key = 1;
optional bytes value = 2;
}
// 为方便理解KeyValuesPair数据结构
// 下面列出了在Demo程序'单词排序应用实例'中的一个计算任务KeyValuePair变量内部的数据内容如下:
{key=单词1, value=count1}
KeyValuePair其中的Key用来保存自定义变量的名称,value用来保存该自定义变量的数据;
value中的数据可以使用具体的数据进行填充;
注意:KeyValuesPair 和 KeyValuePair数据结构名称只差一个表示是复数的字母s;
FastDCS通过简洁而有效的设计,为开发者提供一套简洁而有效的分布式计算框架,开发者只需要定制由FastDCS提供的几个用户自定义UDF函数(User defined function),就可以完成整个分布式系统的开发工作。
开发者可以通过自定义Master节点类实现自定义的FastDCS管理节点功能,需要处理的有如下几个部分:
DemoMaster
,继承 Master 类后即可拥有FastDCS服务集群的管理功能;DemoMaster
中的自定义变量,可以在虚函数 InitialTracker 中实现;DemoMaster
中的自定义变量,可以在虚函数 FinalizeTracker 中实现;DemoMaster
注册到FastDCS中;管理节点Master开发框架
// 1.自定义Master类名`DemoMaster`派生在Master之上
class DemoMaster : public Master {
// 2.Master节点启动后,会调用这个方法
void InitialTracker(struct settings_s settings) {
// 初始化在 DemoMaster 中使用的自定义变量
......
Master::InitialTracker(settings);
};
// 3.Master节点退出时,会调用这个方法
void FinalizeTracker() {
// 释放在 DemoMaster 中使用的自定义变量
......
Master::FinalizeTracker();
};
// 4.FastDCS空闲的时候会主动调用这个方法,将新的计算任务导入到服务集群
// 该方法只会由选举为Primary master的节点进行调用
bool ImportTaskUDF(vector<FdcsTask> &tasks) {
// 你需要将外部存储系统中的计算任务导入到服务集群中
......
return true;
};
// 5.FastDCS会主动调用这个方法,将任务的计算结果导出到外部存储系统
// 该方法只会由选举为Primary master的节点进行调用
bool ExportTaskUDF(vector<FdcsTask> tasks) {
// 你需要将计算保存到外部存储系统中
......
return true;
};
private:
自定义变量;
};
// 6.注册宏必须填写正确,`DemoMaster`是你自定义的Master类名称
REGISTER_FASTDCS_TRACKER(DemoMaster);
开发者可以通过自定义Worker节点类实现自定义的FastDCS工作节点功能,需要处理的有如下几个部分:
DemoWorker
,继承 Worker 类后即可响应FastDCS服务集群的计算任务调度、并行计算等功能;DemoWorker
中的自定义变量,可以在虚函数 InitialTracker 中实现;DemoWorker
中的自定义变量,可以在虚函数 FinalizeTracker 中实现;DemoWorker
注册到FastDCS中;工作节点Worker开发框架
// 1.自定义Worker类名`DemoWorker`派生在Master之上
class DemoWorker : public Worker {
// 2.Worker节点启动后,会调用这个方法
void InitialTracker(struct settings_s settings) {
// 初始化在 DemoWorker 中使用的自定义变量
......
Worker::InitialTracker(settings);
};
// 3.Worker节点退出时,会调用这个方法
void FinalizeTracker() {
// 释放在 DemoWorker 中使用的自定义变量
......
Worker::FinalizeTracker();
};
// 4.FastDCS中有需要计算的任务的时候会主动调用这个方法,
// 开发者自行实现自定义的计算方法
bool ComputingUDF(FdcsTask &task) {
// 开发者自行实现自定义的计算方法
......
return true;
};
private:
自定义变量;
};
// 5.注册宏必须填写正确,`DemoWorker`是你自定义的Worker类名称
REGISTER_FASTDCS_TRACKER(DemoWorker);
FastDCS同样也是非常容易进行部署的,开发者可以轻松的将FastDCS系统部署在很多台Linux服务器中的, FastDCS会自动将这些Linux服务器建立起服务集群,以分布式运行的方式完成开发者的计算任务。
FastDCS只能运行在Linux系统中(需要内核版本高于 2.6 ),目前FastDCS-v0.1.1版本需要Google Protocol Buffers库的支持。
FastDCS系统运行是不需要依赖mysql,但由于FastDCS的demo演示程序是使用mysql作为外部存储系统进行代码编写的, 所以你的系统中需要有mysql数据库,或者有能够提供远程连接的mysql数据库。
编译和安装依赖环境
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig/
到 ~/.bash_profile
文件中/usr/local/lib/
到 /etc/ld.so.conf
文件中SET(MYSQL_DIR "/usr/local/mysql")
,将你的mysql安装路径替换 /usr/local/mysql
。make install
命令将FastDCS安装到其他路径,你需要修改 set(CMAKE_INSTALL_PREFIX "/home/liuxun/FastDCS")
, 将你自己安装路径替换 /home/liuxun/FastDCS
FastDCS一共有2个配置文件,分别位于 /src/conf/master.conf 和 /src/conf/worker.conf ,这2个配置文件中绝大部分的配置项是通用的。
系统配置参数说明
# Copyright 2013-02-05
# Author: Liu Xun (my@liuxun.org)
#
# FastDCS master config file
# 设置成 true,系统将以服务的方式在后台运行
# 设置成 false,系统将以应用的方式在前台运行
run_by_daemon = true
# 设置成 true,当系统异常崩溃的时候,将产生core文件
# 设置成 false,当系统异常崩溃的时候,不会产生core文件
max_core_file = false
# 三种日志类型的输出文件路径
# 如果不需要输出到文件,可以设置成/dev/null,如:
# info_log = /dev/null
# 如果需要见日志输出到控制台,可以不设置任何值,如:
# info_log =
info_log = /home/liuxun/FastDCS/demo/info.log
warn_log = /home/liuxun/FastDCS/demo/warn.log
err_log = /home/liuxun/FastDCS/demo/err.log
# 日志文件最大容量,默认值10M,支持的单位如下:
# G or g for gigabyte(GB)
# M or m for megabyte(MB)
# K or k for kilobyte(KB)
log_max_size = 10MB
# 指定以哪个用户运行这个程序,如果没有设置,以当前用户执行
# 该设置项只有在 run_by_daemon = true 的时候才有效
run_by_user = liuxun
# 保存PID的文件名称
pid_file = /tmp/fdcs_master.pid
# 系统支持的最大连接数,默认值256
# 这个参数的建议值是服务集群所运行的节点数的5倍
max_connections = 256
# 用于分配给每个Socket链接的缓冲区大小,默认64KB
# 这个参数建议的范围是 [8KB, 512KB]
socket_buff_size = 128KB
# 在Master 或 Worker 类的基础上派生的自定义类名称
# 例如FastDCS提供的demo程序中的Master节点需要配置成
# class_factory = DictMaster
# 例如FastDCS提供的demo程序中的Worker节点需要配置成
# class_factory = DictWorker
class_factory = "please input you master class name"
# 整个master节点集群的服务器IP和端口队列
# 格式为 host1:port1;host2:port2;...
tracker_group = 127.0.0.1:32301;127.0.0.1:32302;127.0.0.1:32303
# 当前节点的IP和端口
tracker_server = 127.0.0.1:32301
# Socket连接超时时间(单位:秒)
# default value is 30s
connect_timeout = 30
# 网络连接超时时间(单位:秒)
# default value is 30s
network_timeout = 60
# 节点发送心跳包的时间间隔(单位:秒)
# 该设置不能小于 lease_timeout 的设置值
heart_beat_interval = 30
# 节点状态报告的时间间隔(单位:秒)
stat_report_interval = 60
# 数据副本有效期时间(单位:秒)
# 该设置不能大于 heart_beat_interval 的设置值
lease_timeout = 10
# Primary master节点
# 每次从外部存储系统加载的计算任务数目(单位:个)
preload_tasks = 100
# Second master节点每次从Primary master节点
# 同步计算任务副本的数目(单位:个)
task_duplicate = 10
# Worker节点中的任务计算线程数
# 仅使用与Worker节点配置
computing_threads = 2
# FastDCS自带mysql的访问封装类的配置项
# FastDCS自身运行是不需要依赖mysql的
# 如果你需要在自定义函数中使用可以在这里进行配置
mysql_database = FastDCS
mysql_host = localhost
mysql_user = FastDCS
mysql_passwd = fastdcs
mysql_port = 3306
在FastDCS的源代码中的/src/demo/目录下有一个完整的开发样例,该样例演示了如何使用FastDCS对/src/demo/dict.txt文件中的英文单词按照出现的次数进行排序,该样例仅用于FastDCS开发讲解并无实际应用价值;
DictMaster
,用来从dict_task表中获取未处理的计算任务单元导入到FastDCS服务集群中,还负责将计算结果保存到dict_word表中;DictWorker
,用来接受DictMaster节点分配的计算任务单元,根据计算任务单元的ID从dict_task表中获取对应的文字内容,然后按照单词进行拆分计算,计算的结果由FastDCS自动发送给DictMaster节点进行保存;// 1.将自定义类'DictMaster'派生在'Master'之上
class DictMaster : public Master {
public:
// 2.Master节点程序启动后,会调用这个方法
void InitialTracker(struct settings_s settings) {
// 初始化自定义变量 mysql_connect_,连接mysql数据库
mysql_connect_.Connect(settings.mysql_database,
settings.mysql_host,
settings.mysql_user,
settings.mysql_passwd,
settings.mysql_port);
// 必须调用基类中的InitialTracker函数,初始化Master类
Master::InitialTracker(settings);
};
// 3.Master节点程序退出时,会调用这个方法
void FinalizeTracker() {
// 由于自定义变量 mysql_connect_ 是自动释放资源的,所以没有自定义变量需要在这里释放
// 必须调用基类中的FinalizeTracker函数,释放Master类
Master::FinalizeTracker();
};
// 4.FastDCS空闲的时候会主动调用这个方法,将新的计算任务导入到服务集群
// 该方法只会由选举为'Primary master'的节点进行调用
// 开发思路:批量获取'dict_task'表中的任务状态字段'task_status'等于'1'未处理的计算任务
// 将它们导入到FastDCS,同时将这批数据的任务状态字段'task_status'设置为'2'(已处理)
bool ImportTaskUDF(vector<FdcsTask> &tasks) {
LOG(INFO) << "DictMaster::ImportTaskUDF()";
std::string select_sql, update_sql, update_where;
update_sql = "UPDATE dict_task SET task_status = 2 WHERE ";
// 根据配置文件master.conf中的'preload_tasks'所配置的参数批量获取
// task_status等于'1'(未处理)的的计算任务
std::string format = "SELECT task_id FROM dict_task WHERE task_status = 1 LIMIT %d;";
SStringPrintf(&select_sql, format.data(), fdcs_env_.PreloadTasks());
LOG(INFO) << select_sql;
MysqlQuery query(&mysql_connect_);
if (!query.Execute(select_sql.data())) {
LOG(ERROR) << "select task from mysql failduer!";
return false;
}
MysqlResult *sql_result = query.Store();
int rows_size = sql_result->RowsSize();
LOG(INFO) << "sql_result->RowsSize() = " << sql_result->RowsSize();
if (0 == rows_size) return false;
// 获取到新的计算任务后,为了避免重复处理
// 将数据库中本次获取的计算任务的'task_status'设置为'2'(正在处理状态)
FdcsTask task;
for (int i = 0; i < rows_size; ++i) {
MysqlRow row = sql_result->FetchRow();
std::string task_id = (char*)row[0];
LOG(INFO) << "task_id = " << task_id;
task.set_task_id(task_id);
tasks.push_back(task);
if (update_where.empty()) {
update_where = "task_id = '" + task_id + "'";
} else {
update_where = update_where + " or task_id = '" + task_id + "'";
}
}
query.FreeResult(sql_result);
update_sql = update_sql + update_where + ";";
// 执行更新task_status状态的SQL语句
if (!query.TryExecute(update_sql.data())) {
LOG(ERROR) << "update task status faildure!";
return false;
}
return true;
};
FastDCS 的作者刘勋,有着 17 年的软件行业和互联网行业开发和架构经验,2008年-2012年曾创办瑞读网,是国内最早期的数字出版云服务提供商,在创业期间开发的 FastDCS 分布式计算系统为瑞读网的客户提供了长期稳定的服务。
本文已由作者授权网易云社区发布,未经允许不得转载。