InputFormat牛逼(6)org.apache.hadoop.地图reduce.lib.db.DBRecordReader
InputFormat牛逼(6)org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
@Public
@Evolving
A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.
@Public
@Evolving
A RecordReader that reads records from a SQL table. Emits LongWritables containing the record number as key and DBWritables as value.
@InterfaceAudience.Public @InterfaceStability.Evolving public class DBRecordReader<T extends DBWritable> extends RecordReader<LongWritable, T> { private static final Log LOG = LogFactory.getLog(DBRecordReader.class); private ResultSet results = null; private Class<T> inputClass; private Configuration conf; private DBInputFormat.DBInputSplit split; private long pos = 0; private LongWritable key = null; private T value = null; private Connection connection; protected PreparedStatement statement; private DBConfiguration dbConf; private String conditions; private String [] fieldNames; private String tableName; /** * @param split The InputSplit to read data for * @throws SQLException */ public DBRecordReader(DBInputFormat.DBInputSplit split, Class<T> inputClass, Configuration conf, Connection conn, DBConfiguration dbConfig, String cond, String [] fields, String table) throws SQLException { this.inputClass = inputClass; this.split = split; this.conf = conf; this.connection = conn; this.dbConf = dbConfig; this.conditions = cond; this.fieldNames = fields; this.tableName = table; } protected ResultSet executeQuery(String query) throws SQLException { this.statement = connection.prepareStatement(query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); return statement.executeQuery(); } /** Returns the query for selecting the records, * subclasses can override this for custom behaviour.*/ protected String getSelectQuery() { StringBuilder query = new StringBuilder(); // Default codepath for MySQL, HSQLDB, etc. Relies on LIMIT/OFFSET for splits. if(dbConf.getInputQuery() == null) { query.append("SELECT "); for (int i = 0; i < fieldNames.length; i++) { query.append(fieldNames[i]); if (i != fieldNames.length -1) { query.append(", "); } } query.append(" FROM ").append(tableName); query.append(" AS ").append(tableName); //in hsqldb this is necessary if (conditions != null && conditions.length() > 0) { query.append(" WHERE (").append(conditions).append(")"); } String orderBy = dbConf.getInputOrderBy(); if (orderBy != null && orderBy.length() > 0) { query.append(" ORDER BY ").append(orderBy); } } else { //PREBUILT QUERY query.append(dbConf.getInputQuery()); } try { query.append(" LIMIT ").append(split.getLength()); query.append(" OFFSET ").append(split.getStart()); } catch (IOException ex) { // Ignore, will not throw. } return query.toString(); } /** {@inheritDoc} */ public void close() throws IOException { try { if (null != results) { results.close(); } if (null != statement) { statement.close(); } if (null != connection) { connection.commit(); connection.close(); } } catch (SQLException e) { throw new IOException(e.getMessage()); } } public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //do nothing } /** {@inheritDoc} */ public LongWritable getCurrentKey() { return key; } /** {@inheritDoc} */ public T getCurrentValue() { return value; } /** * @deprecated */ @Deprecated public long getPos() throws IOException { return pos; } /** {@inheritDoc} */ public float getProgress() throws IOException { return pos / (float)split.getLength(); } /** {@inheritDoc} */ public boolean nextKeyValue() throws IOException { try { if (key == null) { key = new LongWritable(); } if (value == null) { value = createValue(); } if (null == this.results) { // First time into this method, run the query. this.results = executeQuery(getSelectQuery()); } if (!results.next()) return false; // Set the key field value as the output key value key.set(pos + split.getStart()); value.readFields(results); pos ++; } catch (SQLException e) { throw new IOException("SQLException in nextKeyValue", e); } return true; } //... ... }