https://soulmachine.gitbooks.io/system-design/content/cn/bigdata/data-stream-sampling.html
https://stackoverflow.com/questions/6409652/random-weighted-selection-in-java
有一个无限的整数数据流,如何从中随机地抽取k个整数出来?
这是一个经典的数据流采样问题,我们一步一步来分析。
当k=1时
我们先考虑最简单的情况,k=1,即只需要随机抽取一个样本出来。抽样方法如下:
- 当第一个整数到达时,保存该整数
- 当第2个整数到达时,以1/2的概率使用该整数替换第1个整数,以1/2的概率丢弃改整数
- 当第i个整数到达时,以的概率使用第i个整数替换被选中的整数,以的概率丢弃第i个整数
假设数据流目前已经流出共n个整数,这个方法能保证每个元素被选中的概率是吗?用数学归纳法,证明如下:
- 当n=1时,由于是第1个数,被选中的概率是100%,命题成立
- 假设当n=m(m>=1)时,命题成立,即前m个数,每一个被选中的概率是
- 当n=m+1时,第m+1个数被选中的概率是 , 前m个数被选中的概率是,命题依然成立
由1,2,3知n>=1时命题成立,证毕。
当k>1时
当 k > 1,需要随机采样多个样本时,方法跟上面很类似,
- 前k个整数到达时,全部保留,即被选中的概率是 100%,
- 第i个整数到达时,以的概率替换k个数中的某一个,以的概率丢弃,保留k个数不变
假设数据流目前已经流出共N个整数,这个方法能保证每个元素被选中的概率是吗?用数学归纳法,证明如下:
- 当n=m(m<=k)时,被选中的概率是100%,命题成立
- 假设当n=m(m>k)时,命题成立,即前m个数,每一个被选中的概率是
- 当n=m+1时,第m+1个数被选中的概率是 , 前m个数被选中的概率是,命题依然成立
由1,2,3知n>=1时命题成立,证毕。
http://www.cnblogs.com/datacloud/p/3588120.html
在开发和调试代码的时候,没有必要处理整个数据集。但如果在这种情况下要保证数据集能够被正确地处理,就需要用到抽样了。抽样是统计学中的一个方法。它通过一定的过程从整个数据中抽取出一个子数据集。这个子数据集能够代表整体数据集的数据分布状况。在MapReduce中,开发人员可以只针对这个子数据集进行开发调试,极大减小了系统负担,提高了开发效率。
技术23 水塘抽样(Reservoir sampling)
假设如下场景:在开发一个MapReduce作业的时候,需要反复不断地去测试一个超大数据集。当然,处理这个数据集很费时间,想要快速开发几乎不可能。
问题
在开发MapReduce作业的时候,如何能够只用处理超大数据集的一个小小的子集?
方案
在读取数据的那部分,自定义一个InputFormat来封装默认的InputFormat。在自定义的InputFormat中,将从默认的InputFormat中得到的数据按一定比例进行抽样。
讨论
由于水塘抽样可以从数据流中随机采样,它就特别适合于MapReduce。在MapReduce中,数据源的形式就是数据流。图4.16说明了水塘抽样的算法。
由于水塘抽样可以从数据流中随机采样,它就特别适合于MapReduce。在MapReduce中,数据源的形式就是数据流。图4.16说明了水塘抽样的算法。
这里需要实现ReservoirSamplerRecordReader类来封装默认的InputFormat类和RecordReader类。InputFormat类的作用是对输入进行分块。RecordReader类的作用是读取记录。抽样功能则在ReservoirSamplerRecordReader类中实现。图4.17说明了ReservoirSamplerRecordReader类的工作机制。
1 public static class ReservoirSamplerRecordReader<K extends Writable, V extends Writable> extends RecordReader { 2 3 private final RecordReader<K, V> rr; 4 private final int numSamples; 5 private final int maxRecords; 6 private final ArrayList<K> keys; 7 private final ArrayList<V> values; 8 9 @Override 10 public void initialize(InputSplit split,TaskAttemptContext context) 11 throws IOException, InterruptedException { 12 13 rr.initialize(split, context); 14 Random rand = new Random(); 15 16 for (int i = 0; i < maxRecords; i++) { 17 if (!rr.nextKeyValue()) { 18 break; 19 } 20 21 K key = rr.getCurrentKey(); 22 V val = rr.getCurrentValue(); 23 24 if (keys.size() < numSamples) { 25 keys.add(WritableUtils.clone(key, conf)); 26 values.add(WritableUtils.clone(val, conf)); 27 } else { 28 int r = rand.nextInt(i); 29 if (r < numSamples) { 30 keys.set(r, WritableUtils.clone(key, conf)); 31 values.set(r, WritableUtils.clone(val, conf)); 32 } 33 } 34 } 35 } 36 ...
https://stackoverflow.com/questions/6409652/random-weighted-selection-in-java
public class RandomCollection<E> {
private final NavigableMap<Double, E> map = new TreeMap<Double, E>();
private final Random random;
private double total = 0;
public RandomCollection() {
this(new Random());
}
public RandomCollection(Random random) {
this.random = random;
}
public RandomCollection<E> add(double weight, E result) {
if (weight <= 0) return this;
total += weight;
map.put(total, result);
return this;
}
public E next() {
double value = random.nextDouble() * total;
return map.higherEntry(value).getValue();
}
}
interface Item {
double getWeight();
}
class RandomItemChooser {
public Item chooseOnWeight(List<Item> items) {
double completeWeight = 0.0;
for (Item item : items)
completeWeight += item.getWeight();
double r = Math.random() * completeWeight;
double countWeight = 0.0;
for (Item item : items) {
countWeight += item.getWeight();
if (countWeight >= r)
return item;
}
throw new RuntimeException("Should never be shown.");
}
}