Using Fastexport with TdRedux

NOTE:The implementation described herein does not implement restartable Export jobs due to

While it is possible to implement restartable Exports with TdRedux, the Export support provided is intended for moderately sized, "convenience" workloads, rather than large scale exports (e.g., terabyte exports).

Also note that this implementation has not been tested with complex or parameterized SQL. A future release may alleviate this restriction.

Using Fastexport with TdRedux requires some special programming considerations. In brief:

  1. Logon a regular DBC/SQL session with the ";lsn=0" connection URL parameter; this connection is refered to as the "control connection".
  2. create a Statement object on the control connection via Connection.createStatement() (known as the "control Statement")
  3. create a PreparedStatement on the control session using the SELECT statement to be exported.
  4. Logon 1 or more Export sessions by setting the "partition=EXPORT" parameter AND setting the ";lsn=lsn" URL parameter to the LSN returned from the control session via the tdrdxGetLsn() method.
  5. retrieve the ResultSetMetaData from the PreparedStatement; it will be used by the Export sessions to "clone" the SELECT request ResultSetMetaData.
  6. turn off AutoCommit on the control session
  7. using the control Statement, executeUpdate("BEGIN FASTEXPORT;").
  8. On the control session's PreparedStatement, executeQuery().
  9. on each Export session, create a PreparedStatement using a NULL statement(i.e., ";")
  10. bind 3 pseudo-parameters to the Export PreparedStatement as single element arrays. These parameters are:
    1. query-number: the number of the query to be exported, must be 1. (Multistatement queries may be supported in a future release).
    2. segment-number: the number of the first segment this export session will retrieve
    3. segment-increment: the value automatically added to each export session's segment-number parameter when the current segment is received, and the request for the next segment is triggered automatically. This value is usually the number of export sessions logged on.

    TdRedux internally uses the segment-number and segment-increment to automatically "continue" export sessions so that the execution of the NULL PreparedStatement appears to return results in a single continuous stream, rather than discrete segments (as returned by the database).

  11. executeQuery() on each export session's NULL PreparedStatement.
  12. call tdrdxCopyMetaData(ResultSetMetaData) on the ResultSet object returned from the preceding executeQuery(), passing the ResultSetMetaData retrieved from the control session's execution of the SELECT export statement.
  13. (optionally) bind column arrays to the Export sessions ResultSet
  14. repeatedly execute next() on Export session's ResultSet and retreive the column data as normal, or via the column array binding interface, until next() returns false, indicating the export session has exhausted its set of segments.
  15. When all the Export sessions report the end-of-request, executeUpdate("END FASTEXPORT") on the control session, followed by a commit(), to finalize the Export.
  16. logoff all the Export sessions

The following example illustrates a simple Export. By creating multiple ExportThread objects, a parallel efficient Fastload job can be run.

Example Export Control method



    static void fexptest() {
        System.out.println("Test fastexport...");

        int numsess = 2;
        int lsn = -1;
        com.presicient.tdredux.Connection ctlcon = null;
        java.sql.Connection fexcon[] = new com.presicient.tdredux.Connection[numsess];
        ExportThread fexthread[] = new ExportThread[numsess];
        java.sql.Statement ctlstmt = null;
        java.sql.ResultSet rs = null;
        Properties props = null;
//
//    create the control session
//
        try {
            System.out.println("Logging on control session..");
            ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
            lsn = ctlcon.tdrdxGetLSN();
            ctlstmt = ctlcon.createStatement();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
            if (SQLE.getErrorCode() != 3807)
                return;
        }
//
//    prepare the fastexport SELECT now since we can't do it after
//    BEGIN FASTEXPORT; besides, this gives us the column names
//    and types
//
        com.presicient.tdredux.PreparedStatement expstmt = null;
        try {
            expstmt = (com.presicient.tdredux.PreparedStatement)ctlcon.prepareStatement(
                "SELECT * from fltypetst");
            expstmt.tdrdxSetKeepResp(true);
        }
        catch (SQLException SQLE) {
            System.out.println("Query prepare failed: " + SQLE.getMessage());
            try {
                ctlcon.close();
            }
            catch (SQLException E) { }
            return;
        }
//
//    logon N EXPORT sessions, specifying the control session's LSN value
//    for tdat_lsn and 'EXPORT' for tdat_utility,
//
        int i = 0;
        try {
            for (i = 0; i < numsess; i++) {
                System.out.println("Logging on EXPORT session " + i + "..");
                fexcon[i] = DriverManager.getConnection(
                    myUtilURL + ";partition=EXPORT;lsn=" + lsn);
                fexcon[i].setAutoCommit(false);
            }
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
//
//    check for all amps full error
//
            if (SQLE.getErrorCode() != 2632) {
                try {
                    for (int j = 0; j < i; j++) fexcon[j].close();
                    ctlcon.close();
                } catch (SQLException S) { }
            }
        }
        numsess = i;
        System.out.println("All AMPs full..." + numsess + " total EXPORT sessions.");
//
//    send BEGIN FASTEXPORT; on control session
//
        try {
            ctlcon.setAutoCommit(false);
            ctlstmt.executeUpdate("BEGIN FASTEXPORT;");
        }
        catch (SQLException SQLE) {
            System.out.println( "BEGIN EXPORT failed: " + SQLE.getMessage());
            try {
                for (int j = 0; j < numsess; j++) fexcon[j].close();
                ctlcon.close();
            } catch (SQLException S) { }
            return;
        }
//
//    Execute fastexport query on control session
//
        long festarted = System.currentTimeMillis();

        try {
            rs = expstmt.executeQuery();
        }
        catch (SQLException SQLE) {
            System.out.println( "Query execute failed: " + SQLE.getMessage());
            try {
                for (int j = 0; j < numsess; j++) fexcon[j].close();
                ctlcon.close();
            } catch (SQLException S) { }
            return;
        }
//
//    start each export thread
//
        for (i = 0; i < numsess; i++) {
            try {
                fexthread[i] = new ExportThread((com.presicient.tdredux.Connection)fexcon[i],
                    (com.presicient.tdredux.ResultSetMetaData)expstmt.getMetaData(), i+1, numsess);
                fexthread[i].setName(new String("FEXP" + i));
            }
            catch (SQLException SQLE) { }
        }
        for (i = 0; i < numsess; i++)
            fexthread[i].start();
//
//    wait for them to complete
//
        for (i = 0; i < numsess; i++) {
            try {
                fexthread[i].join();
                System.out.println("Export thread " + i + " finished with result " +
                    fexthread[i].getResult());
            }
            catch (InterruptedException IE) { }
        }
        festarted = (System.currentTimeMillis() - festarted)/1000;
        System.out.println("Export completed in " + festarted + " secs...");
        System.out.println("Export complete, finishing up...");
//
//    wrap up the Export
//
        try {
            ctlstmt.executeUpdate("END FASTEXPORT;");
            ctlcon.commit();
            for (i = 0; i < numsess; i++)
                fexcon[i].close();
            ctlcon.close();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }
        System.out.println("Fastexport OK.");
    }



Example Export Thread


import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;

//
//    Test harness for TdRedux Fastexport
//
public class ExportThread extends Thread {

    private com.presicient.tdredux.Connection conn = null;
    private int count = 0;
    private int seqnum[] = { 1 };
    private int qnum[] = { 1 };
    private int increment[] = { 1 };
    private String errmsg = "OK";
    private int errcount = 0;
    com.presicient.tdredux.ResultSetMetaData rsmd = null;

    public ExportThread(com.presicient.tdredux.Connection conn,
        com.presicient.tdredux.ResultSetMetaData rsmd, int first_seg, int seg_incr) {
        this.conn = conn;
        this.rsmd = rsmd;
        seqnum[0] = first_seg;
        increment[0] = seg_incr;
    }

    public String getResult() {
        return new String((errcount == 0) ? "OK. Recv'd " + count + " rows." :
            "ERROR: " + errmsg + " after " + count + " rows.");
    }

    public void run() {
        com.presicient.tdredux.PreparedStatement fexstmt = null;
        com.presicient.tdredux.ResultSet fexrs = null;
        byte[][] rows = new byte[100][];
        int total = 0;
        try {
//
//    prepare a NULL statement to initiate the EXPORT
//
            fexstmt = (com.presicient.tdredux.PreparedStatement)conn.prepareStatement(";");
//
//    3 pseudo-params need to be bound: the query number, the sequence number,
//    the sequence number increment
//    use a "bind" semantic to support auto-continue of export sessions
//    Note that the seqnum is auto-incremented internally within a mutex
//
            fexstmt.tdrdxBindIntArray(1, qnum);
            fexstmt.tdrdxBindIntArray(2, seqnum);
            fexstmt.tdrdxBindIntArray(3, increment);
            fexrs = (com.presicient.tdredux.ResultSet)fexstmt.executeQuery();
//
//    copy metadata from the control query
//
            fexrs.tdrdxCopyMetaData(rsmd);
//
//    bind array of byte arrays to column 0 to indicate raw row fetch
//
            fexrs.tdrdxBindBinaryArray(0, rows);
        }
        catch (SQLException SQLE) {
            errcount = 1;
            errmsg = new String("Can't prepare on export thread " + this.getName() + ":" +
                SQLE.getMessage());
            return;
        }

        try {
            int rowsrcvd = 0;
            int curseg = 0;
            while (fexrs.next()) {
                if (curseg != seqnum[0]) {
                    curseg = seqnum[0];
                    System.out.println(this.getName() + " on segment " + curseg);
                }
                rowsrcvd = fexrs.tdrdxGetRowsFetched();
                if (rowsrcvd == 0)
                    break;
                count += rowsrcvd;
            }
        }
        catch (SQLException SQLE) {
            if (SQLE.getErrorCode() != 2588) {
                errcount = 1;
                errmsg = new String(SQLE.getMessage());
            }
        }
        try {
            fexrs.close();
        }
        catch (SQLException E) { }
    }
}