DevilKing's blog

冷灯看剑,剑上几分功名?炉香无需计苍生,纵一穿烟逝,万丈云埋,孤阳还照古陵

0%

原文链接

initial job queue system architecture

Redis task queue:

redis queue

job with an identical ID

Pools of worker machines poll the Redis clusters, looking for new work. When a worker finds a job in one of the queues it monitors, it moves the job from the pending queue to a list of in-flight jobs, and spawns an asynchronous task to handle it.

Architectural Problems

  • Redis had little operational headroom, particularly with respect to memory. If we enqueued faster than we dequeued for a sustained period, we would run out of memory and be unable to dequeue jobs (because dequeuing also requires having enough memory to move the job into a processing list).
  • Redis connections formed a complete bipartite graph — every job queue client must connect to (and therefore have correct and current information on) every Redis instance.
  • Job workers couldn’t scale independently of Redis — adding a worker resulted in extra polling and load on Redis. This property caused a complex feedback situation where attempting to increase our execution capacity could overwhelm an already overloaded Redis instance, slowing or halting progress.
  • Previous decisions on which Redis data structures to use meant that dequeuing a job requires work proportional to the length of the queue. As queues become longer, they became more difficult to empty — another unfortunate feedback loop.
  • The semantics and quality-of-service guarantees provided to application and platform engineers were unclear and hard to define; asynchronous processing on the job queue is fundamental to our system architecture, but in practice engineers were reluctant to use it. Changes to existing features such as our limited deduplication were also extremely high-risk, as many jobs rely on them to function correctly.

We thought about replacing Redis with Kafka altogether, but quickly realized that this route would require significant changes to the application logic around scheduling, executing, and de-duping jobs. In the spirit of pursuing a minimum viable change, we decided to add Kafka in front of Redis rather than replacing Redis with Kafka outright. This would alleviate a critical bottleneck in our system, while leaving the existing application enqueue and dequeue interfaces in place.

new Queue Architecture

Kafka gate:

  • A bias towards availability over consitency
  • Simple client semantics
  • Minimum latency

replay jobs from kafka to redis

use JQRelay to decode the JSON encoded job

Self-configuration: When a JQRelay instance starts up, it attempts to acquire a Consul lock on an key/value entry corresponding to the Kafka topic. If it gets the lock, it starts relaying jobs from all partitions of this topic. If it loses its lock, it releases all resources and restarts so that a different instance can pick up this topic. 

load test

Failure testing: It was important to understand how different Kafka cluster failure scenarios would manifest in the application, e.g. connect failures, job enqueue failures, missing jobs, and duplicate jobs. For this, we tested our cluster against following failure scenarios:

  1. Hard kill and gracefully kill a broker
  2. Hard kill and gracefully kill two brokers in a single AZ
  3. Hard kill all three brokers to force Kafka to pick an unclean leader
  4. Restart the cluster

Production Rollout

Rolling out the new system included the following steps:

  1. Double writes: We started by double writing jobs to both the current and new system (each job was enqueued to both Redis and Kakfa). JQRelay, however, operated in a “shadow” mode where it dropped all jobs after reading it from Kafka. This setup let us safely test the new enqueue path from web app to JQRelay with real production traffic.
  2. Guaranteeing system correctness: To ensure the correctness of the new system, we tracked and compared the number of jobs passing through each part of the system: from the web app to Kafkagate, Kafkagate to Kafka, and finally Kafka to Redis.
  3. Heartbeat canaries: To ensure that the new system worked end-to-end for 50 Redis clusters and 1600 Kafka partitions (50 topics × 32 partitions), we enqueued heartbeat canaries for every Kafka partition every minute. We then monitored and alerted on the end-to-end flow and timing for these heartbeat canaries.
  4. Final roll-out: Once we were sure of our system correctness, we enabled it internally for Slack for a few weeks. After that showed no problems, we rolled it out one by one for various job types for our customers.

原文链接

  • 主要是 top -Hp [pid]这个命令,查看进程内所有线程的CPU消耗情况
  • jstack pid查看各个线程栈,十进制换成16位进制的nid

原文链接

Good

new Types

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type UserId string // <-- new type
type ProductId string

func AddProduct(userId UserId, productId ProductId) {}

func main() {
userId := UserId("some-user-id")
productId := ProductId("some-product-id")

// Right order: all fine
AddProduct(userId, productId)

// Wrong order: would compile with raw strings
AddProduct(productId, userId)
// Compilation errors:
// cannot use productId (type ProductId) as type UserId in argument to AddProduct
// cannot use userId (type UserId) as type ProductId in argument to AddProduct
}

Bad

Go ignored advances in modern language design

functional programing

No LLVM

GC

Interfaces are structural types

Go interfaces are like Java interfaces or Scala & Rust traits: they define behaviour that is later implemented by a type (I won’t call it “class” here).

No enumerations

iota

The:=/var dilemma

Go provides two ways to declare a variable and assign it a value: var x = "foo" and x := "foo".

The main differences are that var allows declaration without initialization (and you then have to declare the type), like in var x string, whereas := requires assignment and allows a mix of existing and new variables. My guess is that := was invented to make error handling a bit less painful:

Zero values that panic

go have no constructor

1
2
3
4
5
6
7
8
9
var m1 = map[string]string{} // empty map
var m0 map[string]string // zero map (nil)

println(len(m1)) // outputs '0'
println(len(m0)) // outputs '0'
println(m1["foo"]) // outputs ''
println(m0["foo"]) // outputs ''
m1["foo"] = "bar" // ok
m0["foo"] = "bar" // panics!

上面所示,m1初始化了,但m0没有初始化。。

Go doesn’t have exceptions.

just “Defer, panic and recover

Ugly

The dependency management nightmare

Things are getting better though: dep, the official dependency management tool was recently introduced to support vendoring.

But dep may not live long though as vgo, also from Google, wants to bring versioning in the language itself and has been making some waves lately.

mutability is hardcoded in the language

Go makes it easy however to copy an entire struct with a simple assignment, so we may think that passing arguments by value is all that is needed to have immutability at the cost of copying.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type S struct {
A string
B []string
}

func main() {
x := S{"x-A", []string{"x-B"}}
y := x // copy the struct
y.A = "y-A"
y.B[0] = "y-B"

fmt.Println(x, y)
// Outputs "{x-A [y-B]} {y-A [y-B]}" -- x was modified!
}

And the since built-in collections (map, slice and array) are references and are mutable, copying a struct that contains one of these just copies the pointer to the same underlying memory.

Slice gotchas

Slices come with many gotchas: as explained in “Go slices: usage and internals“, re-slicing a slice doesn’t copy the underlying array for performance reasons.

This is a laudable goal but means that sub-slices of a slice are just views that follow the mutations of the original slice. So don’t forget to copy() a slice if you want to separate it from its origin.

Forgetting to copy() becomes more dangerous with the append function: appending values to a slice resizes the underlying array if it doesn’t have enough capacity to hold the new values. This means that the result of append may or may not point to the original array depending on its initial capacity. This can cause hard to find non deterministic bugs. (在append的时候,如果没有足够的空间,会重新resize slice,这样pointer会乱指)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func doStuff(value []string) {
fmt.Printf("value=%v\n", value)

value2 := value[:]
value2 = append(value2, "b")
fmt.Printf("value=%v, value2=%v\n", value, value2)

value2[0] = "z"
fmt.Printf("value=%v, value2=%v\n", value, value2)
}

func main() {
slice1 := []string{"a"} // length 1, capacity 1

doStuff(slice1)
// Output:
// value=[a] -- ok
// value=[a], value2=[a b] -- ok: value unchanged, value2 updated
// value=[a], value2=[z b] -- ok: value unchanged, value2 updated

slice10 := make([]string, 1, 10) // length 1, capacity 10
slice10[0] = "a"

doStuff(slice10)
// Output:
// value=[a] -- ok
// value=[a], value2=[a b] -- ok: value unchanged, value2 updated
// value=[z], value2=[z b] -- WTF?!? value changed???
}

mutability and channels: race conditions made easy

As we saw above there is no way in Go to have immutable data structures. This means that once we send a pointer on a channel, it’s game over: we share mutable data between concurrent processes. Of course a channel of structures (and not pointers) copies the values sent on the channel, but as we saw above, this doesn’t deep-copy references, including slices and maps, which are intrinsically mutable. Same goes with struct fields of an interface type: they are pointers, and any mutation method defined by the interface is an open door to race conditions.

noisy error management

In “Error has values“ Rob Pike suggests some strategies to reduce error handling verbosity

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type errWriter struct {
w io.Writer
err error
}

func (ew *errWriter) write(buf []byte) {
if ew.err != nil {
return // Write nothing if we already errored-out
}
_, ew.err = ew.w.Write(buf)
}

func doIt(fd io.Writer) {
ew := &errWriter{w: fd}
ew.write(p0[a:b])
ew.write(p1[c:d])
ew.write(p2[e:f])
// and so on
if ew.err != nil {
return ew.err
}
}

nil interface values

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Explodes interface {
Bang()
Boom()
}

// Type Bomb implements Explodes
type Bomb struct {}
func (*Bomb) Bang() {}
func (Bomb) Boom() {}

func main() {
var bomb *Bomb = nil
var explodes Explodes = bomb
println(bomb, explodes) // '0x0 (0x10a7060,0x0)'
if explodes != nil {
println("Not nil!") // 'Not nil!' What are we doing here?!?!
explodes.Bang() // works fine
explodes.Boom() // panic: value method main.Bomb.Boom called using nil *Bomb pointer
} else {
println("nil!") // why don't we end up here?
}
}

struct field tags: runtime DSL in a string

Go has few data structures beyond slice and map