DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

原文链接

GraphX

Graphframes

GraphX is to RDDs as GraphFrames are to DataFrames

A GraphFrame is always created from a vertex DataFrame (e.g. users) and an edges DataFrame (e.g. relationships between users). The schema of both DataFrames has some mandatory columns. The vertex DataFrame must contain a column named id that stores unique vertex IDs. The edges DataFrame must contain a column named src that stores the source of the edge and a column named dst that stores the destination of the edge. All other columns are optional and can be added depending on one’s needs.

1
2
3
4
5
6
g = GraphFrame(vertices, edges)
## Take a look at the DataFrames
g.vertices.show()
g.edges.show()
## Check the number of edges of each vertex
g.degrees.show()

directed vs undirected edges

有向边以及无向边部分

A GraphFrame itself can’t be filtered, but DataFrames deducted from a Graph can. Consequently, the filter-function (or any other function) can be used just as you would use it with DataFrames.

图的全连通部分

Motif finding

Finding motifs helps to execute queries to discover structural patterns in graphs

As an example we can try to find the mutual friends for any pair of users a and c. In order to be a mutual friend b, b must be a friend with both a and c (and not just followed by c, for example).

1
2
3
mutualFriends = 
g.find("(a)-[]->(b); (b)-[]->(c); (c)-[]->(b); (b)-[]->(a)")\
.dropDuplicates()

TriangleCount and PageRank

由graphFrames衍生的很自然的两个算法

背景

spark 1.6.2 -> spark 2.2

内存中的数据组织形式

memory

在内存中,改成了以列式存储为主

(1) 与numpy或者tensorflow接入时可实现zero serialization(零序列化)

(2) 与Spark的in-memory columnar-cache无缝兼容

(3) 更利于压缩技术的引入

CBO

基于成本的优化器CBO,是根据计算出的所有可能的物理计划的代价,选择代价最小的物理执行计划。关键点在于能评估一个物理执行计划的代价。

  • HashJoin中build side and probe side中的问题

  • 在 Spark SQL 中,Join主要有两种执行方式,Shuffle-based Join, BroadcastJoin。Shuffle-based Join会有Shuffle,代价比较。BroadcastJoin不需要shuffle,但要求至少有一张表足够小,这样就能通过Broadcast机制,广播到每个Executor中。

    如下图所示,在没CBO的情况下,SparkSQL通过spark.sql.autoBroadcastJoinThreshold来判断是否选择BroadcastJoin(默认值为10MB)。如果判断基于原始表大小,会选用SortMergeJoin。

    在有CBO的情况下,经过Filter后的Table 2的大小正好满足小于10MB的情况下,会正确地采用性能更好的BroadcastJoin。

broadcast

  • 并行的join的情况

GPU不够的情况下,

在TensorFlow上,我们可以比较方便地定制一个optimizer来实现这种操作,封装一下实际的optimizer,实际上做梯度累加和延迟更新两部就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class LazyUpdateOptimizer(tf.train.Optimizer):

def __init__(self, optimizer, batch_size=1,
use_locking=True, name="LazyUpdateOptimizer"):

tf.train.Optimizer.__init__(self, use_locking=use_locking, name=name)

self._name = name
self._batch_size = batch_size
self._grad_cache = {}
self._optimizer = optimizer
self._vars = []

with tf.variable_scope(self._name):
self._batch_count_variable = \
tf.get_variable(name="batch_count",
shape=[],
dtype=tf.int64,
initializer=tf.constant_initializer(self._batch_size),
collections=[tf.GraphKeys.LOCAL_VARIABLES])
self._vars.append(self._batch_count_variable)

@property
def optimizer(self):
return self._optimizer

@property
def name(self):
return self._name

@property
def batch_size(self):
return self._batch_size

def get_initializer(self):
return tf.group([_.initializer for _ in self._vars])

def apply_gradients(self, grads_and_vars, global_step=None, name=None):
scope_name = self._name
if name is not None:
scope_name += "_" + name

cached_grads = []
for grad, var in grads_and_vars:
if grad is None:
continue

if var is not None and var not in self._grad_cache:
with tf.variable_scope(scope_name):
with tf.colocate_with(var):
cached_grad = tf.get_variable(name=var.name.split(":")[0] + "_grad_cache",
dtype=var.dtype,
shape=var.shape,
initializer=tf.zeros_initializer(),
trainable=False,
collections=[tf.GraphKeys.LOCAL_VARIABLES])
self._vars.append(cached_grad)
self._grad_cache[var] = cached_grad
cached_grads.append(self._grad_cache[var])

with tf.name_scope(scope_name):
cache_gradients_op = self.__cache_gradients(grads_and_vars, cached_grads)

with tf.control_dependencies([cache_gradients_op]):
apply_op = tf.cond(
tf.equal(self._batch_count_variable, 0),
true_fn=lambda: self.__actual_apply_gradients(grads_and_vars, global_step=global_step),
false_fn=lambda: tf.no_op())
with tf.control_dependencies([apply_op]):
return tf.no_op()

def __cache_gradients(self, grads_and_vars, cached_grads):
update_ops = []
with tf.name_scope("cache_grad"):
for (grad, var), cached_grad in itertools.izip(grads_and_vars, cached_grads):
with tf.colocate_with(cached_grad):
if isinstance(grad, tf.Tensor):
update_op = tf.assign_add(cached_grad, grad)
elif isinstance(grad, tf.IndexedSlices):
update_op = tf.scatter_add(cached_grad, grad.indices, grad.values)
else:
continue

update_ops.append(update_op)
with tf.control_dependencies([tf.group(update_ops, name="record_gradients")]):
return tf.assign_sub(self._batch_count_variable, 1)

def __actual_apply_gradients(self, grads_and_vars, global_step=None):
actual_grads_and_vars = [(self._grad_cache[var], var) for grad, var in grads_and_vars if grad is not None]

apply_op = self._optimizer.apply_gradients(actual_grads_and_vars, global_step=global_step)
with tf.control_dependencies([apply_op]):
reset_ops = [tf.assign(self._batch_count_variable, self._batch_size)]

for grad, var in actual_grads_and_vars:
reset_ops.append(tf.assign(self._grad_cache[var], tf.zeros_like(var)))

with tf.control_dependencies(reset_ops):
return tf.no_op()

以一贯之

一视同仁

一往无前


本周工作:

  • 完成featureGraph部分的第二版的接入
  • 了解在线预估部分,尤其是新版fe部分
  • 写一些文档

本周所得:

  • 代码的合理性以及自测性部分
  • 捡起来code review

下周:

  • tuningEntryPoint框架部分的优化
  • 在线预估方案的一些详细设计部分

不出意外的,还是哭了,还是压力太大。。。哎。。。还是静心地照顾好,虽然做不到像陈平安一样牛,但好歹也能尽量去做到以一贯之,不要有所放松和松懈,求一个心安,也求一个顺心意

工作上,感觉有些放松,不能因为做完了一个迭代就以为自己上手了,自己的那么多的bug,还有设计上的遗漏,不正是表示自己还是远远不够。。。然后下一个迭代就歇菜了。。。还是把握不了节奏,哎,尽力去做吧,也有了最坏的打算,虽然做不了什么,但起码不要留下什么遗憾,尽力去完成。。。

锻炼又翘了。。。都一个月了,不能再翘了23333

jenkins的源码分析部分,争取11月底能够分享出来。。。尽力去加油了

原文链接

We’re currently in the process of migrating all 10,000 of the scheduled jobs running on the Netflix Data Platform to use notebook-based execution

Notebooks are, in essence, managed JSON documents with a simple interface to execute code within.

Notebooks pose a lot of challenges: they’re frequently changed, their cell outputs need not match the code, they’re difficult to test, and there’s no easy way to dynamically configure their execution. Furthermore, you need a notebook server to run them, which creates architectural dependencies to facilitate execution.