【ODPS】利用阿里云ODPS功课进行圆周率Pi的计算

【ODPS】利用阿里云ODPS作业进行圆周率Pi的计算

原理:

1.画一个正方形,边长为1CM,在此正方形内绘制一个内接圆。
2.假如我们在此正方形内随机点一个点,这个点落在圆内的概率是P
3.假如我们随机足够多的点,那么我们的P就无限接近于Pi/4(=圆的面积/正方形的面积)

【ODPS】利用阿里云ODPS功课进行圆周率Pi的计算

表设计:

1.随机落点表(记录随机点的位置)
2.我们以左下角坐标为(0,0)原点坐标
3.生成随机数如下:


java源码:

1.创建point表

package bysql;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.Tables;

public class CreateTable {
	
	public static void createTable(Odps odps,String createTableName) throws Exception {
		
		Tables tables = odps.tables();// /获取表示ODPS所有Table的集合对象	
		boolean a = tables.exists(createTableName);// 判断指定表test_table_jyl是否存在
		if (a) {
			System.out.println("指定表存在");
			Table table = tables.get(createTableName);
			System.out.println("指定表信息为:【name:】" + table.getName() + "【Owner:】"
					+ table.getOwner());
			tables.delete(createTableName);
		} else {
			System.out.println("指定表不存在");
		}
		System.out.println("-------------------------------------------------");
		
		/* 创建表 */
		if (tables.exists(createTableName)) {
			System.out.println("指定表存在,无法创建");
		} else {
			System.out.println("指定表不存在,可以创建");

			/* TableSchema表示ODPS中表的定义 */
			TableSchema tableSchema = new TableSchema();
			/* 添加列 */
			Column col; // Column表示ODPS中表的列定义
			col = new Column("x", OdpsType.DOUBLE, "X");
			tableSchema.addColumn(col);
			col = new Column("y", OdpsType.DOUBLE, "Y");
			tableSchema.addColumn(col);
			
			tables.create(createTableName, tableSchema);
			System.out.println("表【" + createTableName + "】创建成功");
		}
		System.out.println("-------------------------------------------------");
	}

}


2.随机模拟数据(利用多线程)

package bysql;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;


class UploadThread implements Callable<Boolean> {
    private long id;
    private RecordWriter recordWriter;
    private Record record;
    static Random random = new Random(System.currentTimeMillis());

    public UploadThread(long id, RecordWriter recordWriter, Record record) {
        this.id = id;
        this.recordWriter = recordWriter;
        this.record = record;
    }

    @Override
    public Boolean call() throws IOException {
        for (int m = 0; m < 10000; m++) {//一个线程插入100条
        	record.setDouble("x", getRandomDouble());
        	record.setDouble("y", getRandomDouble());
        	recordWriter.write(record);
        }
        recordWriter.close();
        return true;
    }

    private double getRandomDouble() {
        return random.nextDouble();
    }
}
package bysql;

import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;

public class InitData {
	public static void uploadDataToYun(Odps odps, String project, String table)
			throws Exception {
		TableTunnel tunnel = new TableTunnel(odps);
		tunnel.setEndpoint("http://dt.odps.aliyun.com");// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自动选择
		TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(
				project, table);
		//线程数
		int threadNum = 100;
		long startTime = System.currentTimeMillis();
		System.out.println("正在上传数据.............");
		ExecutorService pool = Executors.newFixedThreadPool(threadNum);
		ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
		for (int i = 0; i < threadNum; i++) {
			RecordWriter rw = uploadSession.openRecordWriter(i);
			Column[] columns = new Column[2];
			columns[0] = new Column("x", OdpsType.DOUBLE);
			columns[1] = new Column("y", OdpsType.DOUBLE);
			Record r = new ArrayRecord(columns);

			callers.add(new UploadThread(i, rw, r));
		}
		pool.invokeAll(callers);
		pool.shutdown();
		
		Long[] blocks = uploadSession.getBlockList();
		uploadSession.commit(blocks);
		
		System.out.println("数据上传完毕!");
		long endTime = System.currentTimeMillis();
		System.out.println("总共耗时:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
	}
}


3.求圆周率pi

1)执行SQL作业查找在圆内的点的个数A1
 select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);
2)执行SQL作业查询所有的点的个数A2
 select count(*) from point ;
3)计算点出现在圆内的概率
 P=A1/A2
4)计算圆周率Pi
 Pi=4P

package bysql;

import java.util.Map;
import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.task.SQLTask;

public class GetPI {
	
	private static final String ACCESS_ID = "*****************";
	private static final String ACCESS_KEY = "**********************";
	private static final String PROJECT_NAME = "****************";
	private static final String ODPS_URL = "http://service.odps.aliyun.com/api";
	
	public static void main(String args[]) throws Exception {

		/* 先构建阿里云帐号 */
		Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);
		System.out.println("Account Type:" + account.getType());
		System.out.println("-------------------------------------------------");

		/* Odps类是ODPS SDK的入口 */
		Odps odps = new Odps(account);
		odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称
		odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址
		
		String tableName = "point";
		
		/*建表*/
		CreateTable.createTable(odps,tableName);
		
		/*初始化数据*/
		InitData.uploadDataToYun(odps, PROJECT_NAME, tableName);
		
		/*查询总数*/
		long startTime = System.currentTimeMillis();
		String sql = "select count(*) from point ;";
		System.out.println("正在查询当前point总数...");
        String ret = excuteSql(odps, sql);
        System.out.println("总数:"+ret);
        long endTime = System.currentTimeMillis();
        System.out.println("总共耗时:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
        
		/*查询命中数*/
		startTime = System.currentTimeMillis();
        System.out.println("正在查询命中的point总数...");
        sql = "select count(*) from point where ((x-0.5)*((x-0.5))+(y-0.5)*(y-0.5) <=0.5*0.5);";//最多获取1000条信息。
        String ret2 = excuteSql(odps, sql);
        System.out.println("命中数:"+ret2);
        endTime = System.currentTimeMillis();
        System.out.println("总共耗时:" + (endTime - startTime) + " ms");
		System.out.println("-------------------------------------------------");
        
		/**/
		String pi =  calculate(ret, ret2);
        System.out.println("Pi=" + pi);
	}
	
	/**
	 * 执行SQL
	 */
	public static String excuteSql(Odps odps, String sql) throws OdpsException {
        String ret = "";
        
        /*Instance表示ODPS中计算任务的一个运行实例*/
        Instance instance = null;
        instance = SQLTask.run(odps, sql);//运行SQL
        
        instance.waitForSuccess();//阻塞当前线程, 直到Instance结束
         
        /*
         * 获得Instance中Task的运行结果
         * Task的运行结果, key为Task的名称,value为Instance.Result .getString()的结果。
         */
        Map<String, String> results = instance.getTaskResults();
        Map<String, Instance.TaskStatus> taskStatus = instance.getTaskStatus();
        for (Map.Entry<String, Instance.TaskStatus> status : taskStatus.entrySet()) {
            String result = results.get(status.getKey());
            ret += result;
        }
        return ret;
    }
	
	/**
	 * 求Pi
	 */
	private static String calculate(String ret, String ret2) {
        if (ret.indexOf("\n") > 0 && (ret2.indexOf("\n") > 0)) {
            String dataStr[] = ret.split("\n");
            String dataStr2[] = ret2.split("\n");
            float pi = (Float.parseFloat(dataStr2[1]) / Float.parseFloat(dataStr[1])) * 4;
            return String.format("%f", pi);
        }
        return null;
    }

}


结果:

【ODPS】利用阿里云ODPS功课进行圆周率Pi的计算




版权声明:本文为博主原创文章,未经博主允许不得转载。

1楼fuli_mouren昨天 18:11
赞一个。