Using Fastload with TdRedux

NOTE:The implementation described herein does not implement restartable Fastload jobs due to

While it is possible to implement restartable Fastload with TdRedux, the Fastload support provided is intended for moderately sized, "convenience" workloads, rather than large scale loads (e.g., initial terabyte loads).

Error limits are supported as of release 1.29 (see below).

Using Fastload with TdRedux requires some special programming considerations. In brief:

  1. Logon a regular DBC/SQL session with the ";lsn=0" connection URL parameter; this connection is refered to as the "control connection".
  2. Logon 1 or more Fastload sessions by setting the "partition=FASTLOAD" parameter AND setting the ";lsn=lsn" URL parameter to the LSN returned from the control session via the tdrdxGetLsn() method.
  3. create a Statement object on the control connection via Connection.createStatement() (known as the "control Statement")
  4. using the control Statement, executeUpdate("BEGIN LOADING <tablename> ERRORFILES <errortable1>, <errortable2>;").
  5. using the control Statement, executeUpdate("ERRORLIMIT N;") if you wish to enforce a limit on the number of errors to tolerate. Note that the actual number of errors which trigger termination of the fastload may exceed the the specified ERRORLIMIT value, especially if using a multithreaded implementation. If the ERRORLIMIT is exceeded, the next commit() on each fastload connection will throw an exception with the message, "[TdRedux] Error limit exceeded.", a SQLSTATE of "S1000", and an error code of -1. (ERRORLIMIT is implemented as a local SQL directive, and is not actually sent to the DBMS for processing, and hence does not return a valid DBMS error code).
  6. Turn off autocommit
  7. On the control session, executeUpdate(<INSERT-SQL>). Note that the SQL must include the USING clause which defines the source data record format.
  8. on each Fastload session, create a PreparedStatement using the INSERT SQL with USING clause.
  9. on each Fastload session, set parameter values as normal and execute in a loop. NOTE: the application is responsible for keeping track of how full the message transfer buffer is, and thus must compute the length of each record as it will be sent to the database. Be sure to add 4 to the record length for parcle headers. The total must not exceed 64000.
  10. when a message buffer-full of records has been set/bound and executed, call commit() on the Fastload session. This commit will initiate the transfer of the data to the database.
  11. If desired, a "CHECKPOINT LOADING" request can be executed on the control Statement when all Fastload sessions are quiescent, in order to permit recovery from a known record number in case of a failure.
  12. When all the records have been transfered, a "CHECKPOINT LOADING END;" request is sent on the control Statement, followed by commit(). This will end the acquisition phase.
  13. An "END LOADING" statement is then sent on the control session, followed by a commit() to finalize the load.
  14. After a BEGIN LOADING has been executed, if any error occurs which requires job termination, execute a "CHECKPOINT LOADING END;", commit(), "END LOADING", commit() on the control session to release the Fastload.
  15. logoff all the Fastload sessions
  16. (optionally)check the error tables to check for errors.

The following example illustrates a 2 job fastload. By creating multiple FastloadThread objects, a parallel efficient Fastload job can be run.

Example Fastload Control method


    static void fltest(boolean use_array)
    {
        System.out.println("Testing fastload...");
//
//    define our INSERT statement with a USING that
//    matches the record layout
//
        String instmt = new String(
"USING (col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 DATE, col12 TIME(2), col13 TIMESTAMP(3)) "
+ "INSERT INTO fltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6, "
+ ":col7, :col8, :col9, :col10, :col11, :col12, :col13);");

        int numsess = 2;
        int lsn = -1;
        com.presicient.tdredux.Connection ctlcon = null;
        java.sql.Connection flcon[] = new java.sql.Connection[numsess];
        java.sql.Statement ctlstmt = null;
        java.sql.ResultSet rs = null;
        FastloadThread flthread[] = new FastloadThread[numsess];
//
//    logon the control session
//
        try {
            System.out.println("Logging on control session..");
            ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
            lsn = ctlcon.tdrdxGetLSN();
            System.out.println("Logging on error session..");
        }
        catch (SQLException SqlEx) {
            System.out.println(SqlEx.getMessage());
            return;
        }
//
//    force INTEGERDATE, since TdRedux only supports INTEGERDATE
//    for USING clause input
//
        System.out.println("Force dateform to integerdate...");
        try {
            ctlstmt = ctlcon.createStatement();
            rs = ctlstmt.executeQuery("SET SESSION DATEFORM=INTEGERDATE");
            rs.close();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
            return;
        }

        System.out.println("Dropping error tables...");
        dropTable("fltypetst_err1", ctlstmt);
        dropTable("fltypetst_err2", ctlstmt);
        dropTable("fltypetst", ctlstmt);
//
//    create our table
//
        try {
            ctlstmt.executeUpdate(
"CREATE TABLE fltypetst, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
                return;
        }
//
//    logon our fastload sessions and start a thread for each
//
        try {
            for (int i = 0; i < numsess; i++) {
                System.out.println("Logging on FASTLOAD session " + i + "..");
                flcon[i] = DriverManager.getConnection(
                    myUtilURL + ";partition=FASTLOAD;lsn=" + lsn);
                flcon[i].setAutoCommit(false);

                flthread[i] = new FastloadThread(flcon[i], instmt, (i*5000)+1, 5000, use_array);
                flthread[i].setName(new String("FLOAD" + i));
            }
        }
        catch (SQLException SqlEx) {
            System.out.println(SqlEx.getMessage());
//
//    check if we've just filled up our AMPs
//
            if (SQLE.getErrorCode() != 3615)
                return;
            numsess = i;
        }
//
//    now start fastloading
//
        long flstarted = System.currentTimeMillis();
        try {
            ctlstmt.executeUpdate(
    "BEGIN LOADING fltypetst ERRORFILES fltypetst_ERR1, fltypetst_ERR2");
            ctlstmt.executeUpdate("ERRORLIMIT 1000");
            ctlcon.setAutoCommit(false);
        }
        catch (SQLException SQLE) {
//
// cleanup the load if we get an error
//
            System.out.println(SQLE.getMessage());
            try {
                ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
            }
            catch (Exception E) { }
            try {
                ctlcon.commit();
            }
            catch (Exception E) { }
            try {
                ctlstmt.executeUpdate("END LOADING;");
            }
            catch (Exception E) { }
            try {
                ctlcon.commit();
            }
            catch (Exception E) { }
            return;
        }
//
//    execute the INSERT on the control session
//
        try {
            ctlstmt.executeQuery(instmt);
        }
        catch (SQLException SQLE) {
//
// cleanup the load if we get an error
//
            System.out.println(SQLE.getMessage());
            try {
                ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
                ctlcon.commit();
            }
            catch (SQLException E) { }
            try {
                ctlstmt.executeUpdate("END LOADING;");
                ctlcon.commit();
            }
            catch (SQLException E) { }
            return;
        }
//
//    start our threads
//
        for (int i = 0; i <numsess; i++) {
            flthread[i].start();
        }
//
//    wait for completion
//
        for (int i = 0; i< numsess; i++) {
            try {
            flthread[i].join();
            System.out.println("Thread " + i + " completed with result " +
                flthread[i].getResult());
            }
            catch (InterruptedException IE) { }
        }
//
//    we're done loading, so wrap up
//
        System.out.println("Start END LOADING phase...");
        try {
            ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }
        try {
            ctlcon.commit();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }
        try {
            ctlstmt.executeUpdate("END LOADING;");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }
        try {
            ctlcon.commit();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }

        flstarted = (System.currentTimeMillis() - flstarted)/1000;
        System.out.println("Load time was " + flstarted + " secs");

        for (int i = 0; i < numsess; i++) {
            try {
            flcon[i].close();
            }
            catch (SQLException SQLE) { }
        }
//
//    retrieve the error records and report them here
//
        try {
            ctlcon.setAutoCommit(true);
            rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst_err1");
            rs.next();
            int errcnt = rs.getInt(1);
            if (errcnt != 0)
                System.out.println( errcnt + " errors generated during fastload of fltypetst.");
            else {
                rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst_err2");
                rs.next();
                errcnt = rs.getInt(1);
                if (errcnt != 0)
                    System.out.println( errcnt + " errors generated during fastload of fltypetst.");
                else
                    System.out.println("Fastload completed successfully." );
                rs = ctlstmt.executeQuery("SELECT COUNT(*) from fltypetst");
                rs.next();
                errcnt = rs.getInt(1);
                System.out.println("Loaded " + errcnt + " rows." );

                System.out.println("Dropping error tables...");
                dropTable("fltypetst_err2", ctlstmt);
                dropTable("fltypetst_err1", ctlstmt);
            }
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }
        try {
            ctlcon.close();
        }
        catch (SQLException SQLE) { }
        System.out.println("Fastload completed successfully.");
    }


Example Fastload Thread


import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;

//
//    Test harness for TdRedux Fastload
//
public class FastloadThread extends Thread {

    private java.sql.Connection conn = null;
    private int startat = 0;
    private int count = 0;
    private int col1[];
    private short col2[];
    private byte col3[];
    private String col4[];
    private String col5[];
    private double col6[];
    private BigDecimal col7[];
    private BigDecimal col8[];
    private BigDecimal col9[];
    private BigDecimal col10[];
    private java.sql.Date col11[];
    private java.sql.Time col12[];
    private java.sql.Timestamp col13[];

    private static final String alphas =
    "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ ";

    private Random r = null;
    private int errcount = 0;
    private String errmsg = new String("OK");
    private String instmt = null;
    private boolean use_array = false;

    public FastloadThread(java.sql.Connection conn,
        String instmt, int startidx, int count, boolean use_array) {
        this.conn = conn;
        this.startat = startidx;
        this.count = count;
        this.instmt = new String(instmt);
//
//    define our data generator arrays
//
        col1 = new int[250];
           col2 = new short[250];
        col3 = new byte[250];
        col4 = new String[250];
        col5 = new String[250];
           col6 = new double[250];
        col7 = new BigDecimal[250];
        col8 = new BigDecimal[250];
        col9 = new BigDecimal[250];
        col10 = new BigDecimal[250];
        col11 = new java.sql.Date[250];
        col12 = new java.sql.Time[250];
        col13 = new java.sql.Timestamp[250];
        r = new Random(System.currentTimeMillis());
        this.use_array = use_array;
    }

    public void run() {
        com.presicient.tdredux.PreparedStatement flstmt = null;
        try {
//
//    prepare actual stmt to get USING param types
//
            flstmt = (com.presicient.tdredux.PreparedStatement)
                    conn.prepareStatement(instmt);
        }
        catch (SQLException SQLE) { }
        int total = 0;
        while (total < count) {
            try {
                collect_recs_each(total+startat, 250);
                total += 250;
                if (use_array) {
//
//    bind arrays as needed
//
                    flstmt.tdrdxBindIntArray(1,col1);
                    flstmt.tdrdxBindShortArray(2,col2);
                    flstmt.tdrdxBindByteArray(3,col3);
                    flstmt.tdrdxBindStringArray(4,col4);
                    flstmt.tdrdxBindStringArray(5,col5);
                    flstmt.tdrdxBindDoubleArray(6,col6);
                    flstmt.tdrdxBindBigDecimalArray(7,col7);
                    flstmt.tdrdxBindBigDecimalArray(8,col8);
                    flstmt.tdrdxBindBigDecimalArray(9,col9);
                    flstmt.tdrdxBindBigDecimalArray(10,col10);
                    flstmt.tdrdxBindDateArray(11,col11);
                    flstmt.tdrdxBindTimeArray(12,col12);
                    flstmt.tdrdxBindTimestampArray(13,col13);
                    flstmt.execute();
                }
                else {
//
//    otherwise execute for each record to be loaded
//
                    for (int j = 0; j < 250; j++) {
                        flstmt.setInt(1,col1[j]);
                        flstmt.setShort(2,col2[j]);
                        flstmt.setByte(3,col3[j]);
                        flstmt.setString(4,col4[j]);
                        flstmt.setString(5,col5[j]);
                        flstmt.setDouble(6,col6[j]);
                        flstmt.setBigDecimal(7,col7[j]);
                        flstmt.setBigDecimal(8,col8[j]);
                        flstmt.setBigDecimal(9,col9[j]);
                        flstmt.setBigDecimal(10,col10[j]);
                        flstmt.setDate(11,col11[j]);
                        flstmt.setTime(12,col12[j]);
                        flstmt.setTimestamp(13,col13[j]);
                        flstmt.execute();
                    }
                }
//
//    this actually transmits the data to the
//    database
//
                conn.commit();
            }
            catch (SQLException SQLE) {
                errcount = 1;
                errmsg = new String(SQLE.getMessage());
                try {
                    flstmt.close();
                }
                catch (SQLException E) { }
                return;
            }
            System.out.println(this.getName() + " loaded " + total + " rows...");
        } //end while
//
//    all done, wrap up
//
        try {
            flstmt.close();
        }
        catch (SQLException SQLE) { }
    }

    public String getResult()
    {
        if (errcount == 0)
            return new String("OK. Loaded " + count + " rows.");
        return new String("ERROR: " + errmsg);
    }
//
//    various methods for generating load data
//
    private int abs(int v) { return (v < 0) ? v * -1 : v; }

    private String rndstring(int len)
    {
        char s[] = new char[len];
        for (int j = 0; j < len; j++)
            s[j] = alphas.charAt(abs(r.nextInt())%(alphas.length()));
        return new String(s);
    }

    private void collect_recs_each(int base, int count)
    {
//
// col1 integer
// col2 smallint
// col3 byteint
// col4 char(20)
// col5 varchar(40)
// col6 float
// col7 decimal(2,1)
// col8 decimal(4,2)
// col9 decimal(8,4)
// col10 decimal(14,5)
//
        for (int i = 0; i < count; i++, base++) {
            col1[i] = base;
            col2[i] = (short)(base%32767);
            col3[i] = (byte)(base%255 - 128);
            col4[i] = new String(rndstring(20));
            col5[i] = (base%20 == 0) ? null : new String(rndstring((abs(r.nextInt())%40)+1));
            col6[i] = r.nextDouble();
            long l = r.nextLong();
            col7[i] = new BigDecimal(l%100 * 0.1);
            col7[i] = col7[i].setScale(1, BigDecimal.ROUND_HALF_EVEN);
            col8[i] = new BigDecimal(l%10000 * 0.01);
            col8[i] = col8[i].setScale(2, BigDecimal.ROUND_HALF_EVEN);
            col9[i] = new BigDecimal(l%100000000 * 0.0001);
            col9[i] = col9[i].setScale(4, BigDecimal.ROUND_HALF_EVEN);
            col10[i] = new BigDecimal(l%100000000000L * 0.00001);
            col10[i] = col10[i].setScale(5, BigDecimal.ROUND_HALF_EVEN);
            col11[i] = (base%20 == 0) ? null : new java.sql.Date(System.currentTimeMillis());
            col12[i] = (base%20 == 0) ? null : new java.sql.Time(System.currentTimeMillis());
            col13[i] = (base%20 == 0) ? null : new java.sql.Timestamp(System.currentTimeMillis());
        }
    }
}