One way to get started is to modify mr/worker.go’s Worker() to send an RPC to the master asking for a task. Then modify the master to respond with the file name of an as-yet-unstarted map task. Then modify the worker to read that file and call the application Map function, as in mrsequential.go.
If you change anything in the mr/ directory, you will probably have to re-build any MapReduce plugins you use, with something like go build -buildmode=plugin ../mrapps/wc.go
**这里每次都要重新build!**不然直接会出错,一开始没有注意到检查了挺久。
This lab relies on the workers sharing a file system. That’s straightforward when all workers run on the same machine, but would require a global filesystem like GFS if the workers ran on different machines.
The worker’s map task code will need a way to store intermediate key/value pairs in files in a way that can be correctly read back during reduce tasks. One possibility is to use Go’s encoding/json package. To write key/value pairs to a JSON file:
kvaa := make([][]KeyValue, task.NumReduce) for _, kv := range kva { idx := ihash(kv.Key) % task.NumReduce kvaa[idx] = append(kvaa[idx], kv) }
for i := 0; i < task.NumReduce; i++ { storeIntermediateFile(kvaa[i], intermediateFilename(task.TaskNum, i)) }
You can steal some code from mrsequential.go for reading Map input files, for sorting intermedate key/value pairs between the Map and Reduce, and for storing Reduce output in files.
funcreduceTask(reducef func(string, []string)string, task MapReduceTask) { var intermediate []KeyValue for _, filename := range task.ReduceFiles { intermediate = append(intermediate, loadIntermediateFile(filename)...) } sort.Sort(ByKey(intermediate)) oname := fmt.Sprintf("mr-out-%v", task.TaskNum) ofile, _ := os.Create(oname)
// // call Reduce on each distinct key in intermediate[], // and print the result to mr-out-0. // i := 0 for i < len(intermediate) { j := i + 1 for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key { j++ } values := []string{} for k := i; k < j; k++ { values = append(values, intermediate[k].Value) } output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output. fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j }
ofile.Close()
defer finishTask(task) }
The master, as an RPC server, will be concurrent; don’t forget to lock shared data.
// Your code here -- RPC handlers for the worker to call. func(m *Master) MapReduceHandler(args *MapReduceArgs, reply *MapReduceReply) error { m.mu.Lock() defer m.mu.Unlock() ... }
Use Go’s race detector, with go build -race and go run -race. test-mr.sh has a comment that shows you how to enable the race detector for the tests.
Workers will sometimes need to wait, e.g. reduces can’t start until the last map has finished. One possibility is for workers to periodically ask the master for work, sleeping with time.Sleep() between each request. Another possibility is for the relevant RPC handler in the master to have a loop that waits, either with time.Sleep() or sync.Cond. Go runs the handler for each RPC in its own thread, so the fact that one handler is waiting won’t prevent the master from processing other RPCs.
The master can’t reliably distinguish between crashed workers, workers that are alive but have stalled for some reason, and workers that are executing but too slowly to be useful. The best you can do is have the master wait for some amount of time, and then give up and re-issue the task to a different worker. For this lab, have the master wait for ten seconds; after that the master should assume the worker has died (of course, it might not have).
To ensure that nobody observes partially written files in the presence of crashes, the MapReduce paper mentions the trick of using a temporary file and atomically renaming it once it is completely written. You can use ioutil.TempFile to create a temporary file and os.Rename to atomically rename it.
这边只需要修改对应的输出为TempFile -> Rename的流程就可以,这里就不进行修改了。
test-mr.sh runs all the processes in the sub-directory mr-tmp, so if something goes wrong and you want to look at intermediate or output files, look there.
执行sh test-mr.sh的结果如下所示,可以发现对于Lab1的所有测试都完全通过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
*** Starting wc test. 2021/02/03 16:24:03 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- wc test: PASS *** Starting indexer test. 2021/02/03 16:24:09 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- indexer test: PASS *** Starting map parallelism test. 2021/02/03 16:24:12 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- map parallelism test: PASS *** Starting reduce parallelism test. 2021/02/03 16:24:20 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- reduce parallelism test: PASS *** Starting crash test. 2021/02/03 16:24:28 rpc.Register: method "Done" has 1 input parameters; needs exactly three --- crash test: PASS *** PASSED ALL TESTS
A stock’s price fluctuates every day by going up exactly 5% or going down exactly 5%. Assume that each direction is equally likely. Assume zero trading cost, zero interest rate, and no dividends and splits. What strategy is most likely to be profitable after 100 days?
(3 min, score = 3)
A. Buy or sell will produce same profitable
B. Cannot know / no strategy can be profitable
C. Buy the stock
D. Sell the stock
Solution:
D
Below is a list of asymptotic complexities of 8 functions, each with length N input:
O(N^3)
O(Log(N))
O(Sqrt(N))
O(N * log(N))
O(2^N)
O(N^N)
O(N!)
O(Log(Log(N)))
Please sort the functions by order of growth, with slower growing functions first. (your answer shall be a sequence of letters, for example “BACDFHGE”)
(4 min, score = 3)
Solution:
HBDCAEGF
What is the maximum possible variance of a random variable taking values in the interval [0, 10]?
(2 min, score = 2)
Solution:
Half is 0, half is 10.
Var(x)=E[X−E(X)]2=52=25
How many integers n such that nn is a perfect square are there in range [100: 400]?
(5 min, score = 4)
Solution:
If n is even, n=2k, then √nn=√n2k=nk, so it’s a perfect square.
If n is odd, n=2k+1, then √nn=√n2k+1=nk√n, it’s a perfect square if and only if n is a perfect square.
We have:
102=100202=400
So there are 5 odds that meet the condition, 112,132,…,192.
Totally, 151+5=156 integers.
Assume there are three random variables X,Y,Z. All pairwise correlations are equal: corr(x,y)=corr(y,z)=corr(x,z)=r. What is the range of possible values of r? (list a range, like [−0.3:0.7], for example)
(6 min, score = 12)
Assume there are three random variables X,Y,Z. We would like to use one number to describe their relations, just like the pairwise correlation of 2 variables corr(x,y). We need the number to be normalized. Please list the possible mathematical formulas to calculate such number, the more the better.
(6 min, score = 12)
Solution:
3corr(x,y)+corr(y,z)+corr(x,z)
Triangle ABC has sides of length 45, 60 and 75. A point X is placed randomly and uniformly inside the triangle. What is the expected value of the sum of perpendicular distance from point X to this triangle’s three sides?
(9 min, score = 10)
Solution:
452+602=752
It’s easy to know, triangle ABC is a right triangle.
Please list one of your most “strange” or “crazy” idea to predict stock’s return. You can assume you have all available public data and strong computing power. The answer shall be as “strange” as possible.
(6 min, score = 10)
Solution:
Use the stock code to select stocks. If the stock code is a prime number, buy it and hold. The strategy is pretty strange because there should be no useful information in the stock code.
表明转换了T(⋅) 的输入 x 然后传递它到层次Φ(⋅)的网络(即得到Tx后再用网络作用)应该有同样的结果在第一次用网络映射x然后再转换表示(即先得到Φ(x),再用T作用)。注意:T′(⋅)和T(⋅)不是必须相同的。根据这个理论,应用转换层或者滤波器Φ(⋅)去压缩整个网络模型是合理的。从经验观察,深度CNNs同样受益于使用大的卷积滤波器通过应用确定的转换T(⋅)到一个小的基础偏置滤波器的集合,由于它扮演了一个模型的正则化。
import torch import torch.nn as nn import torch.nn.functional as F
classIdentify(nn.Module): def__init__(self): super().__init__() defforward(self, x): return x
classL2Normalization(nn.Module): def__init__(self, p=2, dim=-1): super().__init__() self.p = p self.dim = dim defforward(self, x): out = F.normalize(x, p=2, dim=-1) return out
classGlobalDescriptor(nn.Module): def__init__(self, pooling_type, trainable=False): super().__init__() if trainable: self.p = nn.Parameter(torch.tensor([3.])) else: self.p = 3 if pooling_type == 's': self.method = nn.AdaptiveAvgPool2d(1) elif pooling_type == 'm': self.method = nn.AdaptiveMaxPool2d(1) else: defGeM(x): mean_value = torch.mean(torch.pow(x, self.p), dim=[-1, -2], keepdim=True) out = torch.sign(mean_value) * torch.pow(torch.abs(mean_value), 1 / self.p) return out self.method = GeM self.flatten = nn.Flatten() defforward(self, x): out = self.method(x) out = self.flatten(out) return out