Using Multiload with TdRedux

NOTE:The implementation described herein does not implement restartable MLOAD jobs due to

While it is possible to implement restartable MLOAD with TdRedux, the MLOAD support provided is intended for moderately sized, "convenience" workloads, rather than large scale loads (e.g., initial terabyte loads).

ERRORLIMITs are supported as of release 1.29 of TdRedux (see below).

Using Multiload with TdRedux requires some special programming considerations. In order to support the full range of IMPORT multiload capability, TdRedux extends the accepted SQL syntax with the following statements:

The default is "MARK DUPLICATE ROWS; MARK MISSING ROWS;".

In addition, to emulate multiple DML labels, the application submits multiple multi-statement requests on the control SQL session, and can provide a "job control" bitmask as a special parameter on each MLOAD session.

In brief:

  1. Logon a regular DBC/SQL session with the ";lsn=0" connection URL parameter; this connection is refered to as the "control connection".
  2. Logon 1 or more MLOAD sessions by setting the "partition=MLOAD" parameter AND setting the ";jobs=n" connection URL parameter to indicate the number of MLOAD jobs (i.e., DML labels) to be executed AND setting the ";lsn=lsn" URL parameter to the LSN returned from the control session via the tdrdxGetLsn() method.
  3. create a Statement object on the control connection via Connection.createStatement() (known as the "control Statement")
  4. using the control Statement, executeUpdate("BEGIN MLOAD <tablename>[, <tablename>...];"). Up to 5 tables may be included.
  5. For each table, using the control Statement, executeUpdate("MLOAD WITH <worktable> ERRORTABLES <errortable>, <dup_missing_table>;")
  6. using the control Statement, executeUpdate("ERRORLIMIT N;") if you wish to enforce a limit on the number of errors to tolerate. Note that the actual number of errors which trigger termination of the multiload may exceed the the specified ERRORLIMIT value, especially if using a multithreaded implementation. If the ERRORLIMIT is exceeded, the next commit() on each multiload connection will throw an exception with the message, "[TdRedux] Error limit exceeded.", a SQLSTATE of "S1000", and an error code of -1. (ERRORLIMIT is implemented as a local SQL directive, and is not actually sent to the DBMS for processing, and hence does not return a valid DBMS error code).
  7. Turn off autocommit
  8. For each multistatement job, using the control Statement, executeUpdate(<job-request-SQL>). Note that each job must include the USING clause which defines the source data record format.
  9. call commit() on the control session
  10. using the control Statement, executeUpdate("CHECKPOINT LOADING;")
  11. on each MLOAD session, create a PreparedStatement using just the USING clause portion of the job requests, suffixed with " INSERT INTO"
  12. using PreparedStatement.setInt() with parameter numbers -2 and -3, set a sequence number and rolling offset, respectively, to be used to ID individual records that are sent to MLOAD. TdRedux will use the original sequence number on the first record sent by the session, and then add the offset to the sequence number for each record sent thereafter on that session.
  13. the job control bitmask is generated by setting each bit in a byte value corresponding to the job for which the record is to be sent to the database (yes, each defined job gets it own copy of each record). Bit 0 corresponds to the first job request executed on the control session, bit 1 corresponds to the 2nd job request, etc. If no bit mask is provided, the default is for every job to receive a copy of each record.
  14. set parameter values as normal and execute in a loop. NOTE: the application is responsible for keeping track of how full the message transfer buffer is, and thus must compute the length of each record as it will be sent to the database. Be sure to add 14 to the record length, and then multiply that result by the number of jobs the record will be sent to. The total must not exceed 64000.
  15. if using the driver-specific array binding capability, the job control bitmask (if provided) must be bound via tdrdxBindByteArray().
  16. when a message buffer-full of records has been set/bound and executed, call commit() on the MLOAD session. This commit will initiate the transfer of the data to the database.
  17. If desired, a "CHECKPOINT LOADING" request can be executed on the control Statement when all MLOAD sessions are quiescent, in order to permit recovery from a known record number in case of a failure.
  18. When all the records have been transfered, a "CHECKPOINT LOADING END;" request is sent on the control Statement, followed by commit(). This will end the acquisition phase.
  19. With AutoCommit turned on, execute "EXEC MLOAD <table>[; EXEC MLOAD <table>;....]" statement (one statement per table being loaded) on the control statement to initiate the APPLY phase. Note that this request may run for an extended period of time.
  20. When the APPLY phase completes, disable AutoCommit and execute an "END MLOAD;", followed by commit(), to complete the MLOAD operation.
  21. After a BEGIN MLOAD has been executed, if any error occurs which requires job termination, execute a "RELEASE MLOAD <table>[, <table>...] [ APPLY ]" request on the control session to release the MLOAD. The "APPLY" qualifier should only be sent when the APPLY phase has been initiated; if the failure occurs during the acquisition phase, the "CHECKPONT LOADING END;" and "END MLOAD" should be executed prior to the RELEASE.
  22. logoff all the MLOAD sessions
  23. (optionally)check the error table and dup/missing table to check for errors and/or dups and/or missing records.

The following example illustrates a 2 job multiload, including an UPSERT operation. It consists of 2 classes, the main control class TdRdxMload, and a thread class - MultiloadThread - which runs an MLOAD session. By creating multiple MultiloadThread objects, a parallel efficient MLOAD job can be run.

Example MLOAD Control class


/*
 *    TdRdxMload.java - Test suite for Multiload via TdRedux JDBC driver
 *
 *    Copyright (C) 2003, Presicient Corp.
 *
 */
import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;

//
//    Test harness for TdRedux
//
class TdRdxMload {
    static String myURL = null;

    static public void main(String args[]) {

        System.out.println("Test 0: Load driver");
        try {
            Class.forName("com.presicient.tdredux.Driver");
        }
        catch (ClassNotFoundException E) {
            System.out.println("Can't locate TdRedux driver...check your CLASSPATH.");
            System.exit(-1);
        }
        myURL = new String("jdbc:tdredux:" + args[0] + ";user=" + args[1] + ";password=" + args[2]);
//
//    Test 18: MULTILOAD wo array binding
//
        System.out.println("Test 18: MULTILOAD, no array bind");
        mltest(false);
//
//    Test 19: MULTILOAD w array binding
//
        System.out.println("Test 19: MULTILOAD, array bind");
        mltest(true);
        System.out.println("Tests complete.");
    }


/************************************************************
 *    run multiload test
 ************************************************************/
    static void mltest(boolean use_array)
    {
        System.out.println("Testing multiload...");
//
//    our USING clause defines the record layout
//
        String usingstmt = new String(
"USING (col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 DATE, col12 TIME(2), col13 TIMESTAMP(3)) ");

        String mlstmts[] = new String[2];
//
//    the first job is an upsert
//    that also tracks the records which were
//    actually updated
//
        mlstmts[0] = new String(
"DO INSERT FOR MISSING UPDATE ROWS;\n" +
"MARK DUPLICATE UPDATE ROWS;\n" +
"UPDATE fltypetst SET col5 = :col5 where col1 = :col1;\n" +
"INSERT INTO fltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,\n" +
":col7, :col8, :col9, :col10, :col11, :col12, :col13);");
//
//    the second job is an insert
//    that also tracks duplicate records
//
        mlstmts[1] = new String(
"MARK DUPLICATE INSERT ROWS;\n" +
"INSERT INTO fltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,\n" +
":col7, :col8, :col9, :col10, :col11, :col12, :col13);");

        int numsess = 2;
        int lsn = -1;
        com.presicient.tdredux.Connection ctlcon = null;
        java.sql.Connection mlcon[] = new com.presicient.tdredux.Connection[numsess];
        java.sql.Statement ctlstmt = null;
        java.sql.ResultSet rs = null;
        MultiloadThread mlthread[] = new MultiloadThread[numsess];

        try {
//
//    logon the control session to get an LSN
//
            System.out.println("Logging on control session..");
            ctlcon = (com.presicient.tdredux.Connection)DriverManager.getConnection(myURL + ";lsn=0");
            lsn = ctlcon.tdrdxGetLSN();
            System.out.println("Logging on error session..");
        }
        catch (SQLException SqlEx) {
            System.out.println(SqlEx.getMessage());
            return;
        }
//
//    due to the way we handle DATE types from USING clauses,
//    we need to make sure we use INTEGERDATE
//
        System.out.println("Force dateform to integerdate...");
        try {
            ctlstmt = ctlcon.createStatement();
            rs = ctlstmt.executeQuery("SET SESSION DATEFORM=INTEGERDATE");
            rs.close();
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
            return;
        }
//
//    start with a clean slate...
//
        System.out.println("Dropping tables...");
        dropTable("wt_fltypetst", ctlstmt);
        dropTable("et_fltypetst", ctlstmt);
        dropTable("uv_fltypetst", ctlstmt);
        dropTable("wt_fltypetst2", ctlstmt);
        dropTable("et_fltypetst2", ctlstmt);
        dropTable("uv_fltypetst2", ctlstmt);
        dropTable("fltypetst", ctlstmt);
        dropTable("fltypetst2", ctlstmt);

        try {
            ctlstmt.executeUpdate(
"CREATE TABLE fltypetst, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
                return;
        }

        try {
            ctlstmt.executeUpdate(
"CREATE TABLE fltypetst2, NO FALLBACK ("
+ "col1 integer, col2 smallint, col3 byteint, col4 char(20), "
+ "col5 varchar(40), col6 float, col7 decimal(2,1), "
+ "col8 decimal(4,2), col9 decimal(8,4), col10 decimal(14,5), "
+ "col11 date, col12 time, col13 timestamp) unique primary index(col1);");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
                return;
        }
//
//    logon our Multiload sessions
//
        int i = 0;
        try {
            for (i = 0; i < numsess; i++) {
                System.out.println("Logging on MULTILOAD session " + i + "..");
                mlcon[i] = DriverManager.getConnection(
//
//    NOTE that we supply the number of jobs here
//    so that everyone knows how many jobs we're processing
//    this is needed to determine
//
                    myURL + ";partition=MLOAD;lsn=" + lsn + ";jobs=" + mlstmts.length);
                mlcon[i].setAutoCommit(false);
//
//    and create a thread for it
//
                mlthread[i] = new MultiloadThread(mlcon[i], usingstmt, mlstmts.length,
                    (i*5000)+1, 5000, i, numsess, use_array);
                mlthread[i].setName(new String("MLOAD" + i));
            }
        }
        catch (SQLException SqlEx) {
//
//    only exit if its *not* an "All AMPS FULL" error
//
            System.out.println(SqlEx.getMessage());
            if (SqlEx.getErrorCode() != 3615) {
                for (i--; i >= 0; i--) {
                    try {
                        mlcon[i].close();
                    }
                    catch (SQLException SQLE) { }
                }
                try {
                    ctlcon.close();
                }
                catch (SQLException SQLE) { }
                return;
            }
            numsess = i;    // otherwise adjust our session count
        }
//
//    now start multiloading
//    NOTE: ORDER OF EXECUTION IS IMPORTANT HERE!!!
//
        long mlstarted = System.currentTimeMillis();
        try {
            ctlstmt.executeUpdate("BEGIN MLOAD fltypetst, fltypetst2");
            ctlstmt.executeUpdate("MLOAD fltypetst WITH wt_fltypetst ERRORTABLES et_fltypetst, uv_fltypetst;");
            ctlstmt.executeUpdate("MLOAD fltypetst2 WITH wt_fltypetst2 ERRORTABLES et_fltypetst2, uv_fltypetst2;");
            ctlstmt.executeUpdate("ERRORLIMIT 1000");
            ctlcon.setAutoCommit(false);
        }
        catch (SQLException SQLE) {

            System.out.println(SQLE.getMessage());
//
//    release the mload
//
            for (i = 0; i < numsess; i++) {
                try {
                    mlcon[i].close();
                }
                catch (SQLException E) { }
            }
//
//    it takes a while for mload to clean up internally, so
//    we may need to release a few times
//
            while (true) {
                try {
                    ctlcon.setAutoCommit(true);
                    ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
                    break;
                }
                catch (SQLException SQLE) {
                    System.out.println(SQLE.getMessage());
                    if (SQLE.getErrorCode() != 2571)
                        break;
                }
                catch (Exception E) {
                    System.out.println(SQLE.getMessage());
                    break;
                }
            }
            return;
        }
//
//    This is where TdRedux is smart about the supplied queries...
//    it will parse and break them up, and process directives
//    internally
//
        try {
            ctlstmt.executeUpdate(usingstmt + mlstmts[0]);
            ctlstmt.executeUpdate(usingstmt + mlstmts[1]);
            ctlcon.commit();
            ctlstmt.executeUpdate("CHECKPOINT LOADING;");
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
//
//    release the mload
//
            for (i = 0; i < numsess; i++) {
                try {
                    mlcon[i].close();
                }
                catch (SQLException E) { }
            }
//
//    it takes a while for mload to clean up internally, so
//    we may need to release a few times
//
            while (true) {
                try {
                    ctlcon.setAutoCommit(true);
                    ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
                    break;
                }
                catch (SQLException SQLE) {
                    System.out.println(SQLE.getMessage());
                    if (SQLE.getErrorCode() != 2571)
                        break;
                }
                catch (Exception E) {
                    System.out.println(SQLE.getMessage());
                    break;
                }
            }
            return;
        }
//
//    start our threads
//
        for (i = 0; i < numsess; i++) {
            mlthread[i].start();
        }
//
//    wait for completion
//
        int errcount = 0;
        for (i = 0; i< numsess; i++) {
            try {
                mlthread[i].join();
                String mlresult = mlthread[i].getResult();

                System.out.println("Thread " + i + " completed with result " +
                                   mlresult);
                if (! mlresult.startsWith("OK")) errcount++;
            }
            catch (InterruptedException IE) { }
        }

        if (errcount != 0) {
            for (i = 0; i < numsess; i++) {
                try {
                    mlcon[i].close();
                }
                catch (SQLException E) { }
            }
//
//    we must loop here since we always get the 2571 error
//    on the first RELEASE
//
            while (true) {
                try {
                    ctlcon.setAutoCommit(true);
                    ctlstmt.executeUpdate("RELEASE MLOAD fltypetst, fltypetst2;");
                    break;
                }
                catch (SQLException E) {
                    System.out.println(E.getMessage());
                    if (E.getErrorCode() != 2571) break;
                }
                catch (Exception E) {
                    System.out.println(E.getMessage());
                }
            }
        }
        else {
            System.out.println("Start APPLY phase...");
            try {
                ctlstmt.executeUpdate("CHECKPOINT LOADING END;");
                ctlcon.commit();
                ctlcon.setAutoCommit(true);
            }
            catch (SQLException SQLE) {
                System.out.println(SQLE.getMessage());
            }
//
//    start the apply phase
//
            try {
                System.out.println("Acquisition phase complete, starting apply phase.");
                ctlstmt.executeUpdate("EXEC MLOAD fltypetst; EXEC MLOAD fltypetst2;");
            }
            catch (SQLException SQLE) {
                System.out.println(SQLE.getMessage());
            }
            try {
                ctlcon.setAutoCommit(false);
                ctlstmt.executeUpdate("END MLOAD;");
                ctlcon.commit();
            }
            catch (SQLException SQLE) {
                System.out.println(SQLE.getMessage());
            }
//
//    wrap up
//
            mlstarted = (System.currentTimeMillis() - mlstarted)/1000;
            System.out.println("Load time was " + mlstarted + " secs");

            for (i = 0; i < numsess; i++) {
                try {
                    mlcon[i].close();
                }
                catch (SQLException SQLE) { }
            }
        }
//
//    retrieve the error records and report them here
//
        int errcnt = 0;
        int dupcnt = 0;
        try {
            ctlcon.setAutoCommit(true);
            rs = ctlstmt.executeQuery("SELECT COUNT(*) from et_fltypetst");
            rs.next();
            errcnt = rs.getInt(1);

            rs = ctlstmt.executeQuery("SELECT COUNT(*) from et_fltypetst2");
            rs.next();
            errcnt += rs.getInt(1);
            if (errcnt != 0)
                System.out.println( errcnt + " errors generated during multiload of fltypetst and/or fltypetst2.");
            else
                System.out.println("Multiload completed successfully." );

            rs = ctlstmt.executeQuery("SELECT COUNT(*) from uv_fltypetst");
            rs.next();
            dupcnt = rs.getInt(1);

            rs = ctlstmt.executeQuery("SELECT COUNT(*) from uv_fltypetst2");
            rs.next();
            dupcnt += rs.getInt(1);
        }
        catch (SQLException SQLE) {
            System.out.println(SQLE.getMessage());
        }

        System.out.println("Dropping work tables...");
        dropTable("wt_fltypetst", ctlstmt);
        dropTable("wt_fltypetst2", ctlstmt);

        if (errcnt == 0) {
            dropTable("et_fltypetst", ctlstmt);
            dropTable("et_fltypetst2", ctlstmt);
        }

        if (dupcnt == 0) {
            dropTable("uv_fltypetst", ctlstmt);
            dropTable("uv_fltypetst2", ctlstmt);
        }
        try {
            ctlcon.close();
        }
        catch (SQLException SQLE) { }
        System.out.println("Multiload completed successfully.");
    }

    static public void dropTable(String table, java.sql.Statement stmt) {
        try {
            stmt.executeUpdate("DROP TABLE " + table);
        }
        catch (SQLException SQLE) {
            if (SQLE.getErrorCode() != 3807) {
                System.out.println(SQLE.getMessage());
            }
        }
    }
}


Example Multiload thread class


import java.io.*;
import java.util.*;
import java.math.*;
import java.sql.*;
import com.presicient.tdredux.*;

//
//    Test harness for TdRedux Multiload
//
public class MultiloadThread extends Thread {

    private java.sql.Connection conn = null;
    private int startat = 0;
    private int count = 0;
    private int col1[];
    private short col2[];
    private byte col3[];
    private String col4[];
    private String col5[];
    private double col6[];
    private BigDecimal col7[];
    private BigDecimal col8[];
    private BigDecimal col9[];
    private BigDecimal col10[];
    private java.sql.Date col11[];
    private java.sql.Time col12[];
    private java.sql.Timestamp col13[];
    private byte mlbitmask[];
    private int mlseqnums[];

    private static final String alphas =
    "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_ ";

    private Random r = null;
    private int errcount = 0;
    private String errmsg = new String("OK");
    private boolean use_array = false;
    private String usingstmt = null;
    private int mljobcnt = 0;
    private int offset = 0;
    private int seqnum = 0;

    static protected byte[] jobmasks = { 1,2,4,8,16,32,64 };

    public MultiloadThread(java.sql.Connection conn,
        String usingstmt, int jobcnt, int startidx, int count, int seqnum,
        int offset, boolean use_array) {
        this.conn = conn;
        this.startat = startidx;
        this.count = count;
        this.usingstmt = usingstmt;
        col1 = new int[250];
           col2 = new short[250];
        col3 = new byte[250];
        col4 = new String[250];
        col5 = new String[250];
           col6 = new double[250];
        col7 = new BigDecimal[250];
        col8 = new BigDecimal[250];
        col9 = new BigDecimal[250];
        col10 = new BigDecimal[250];
        col11 = new java.sql.Date[250];
        col12 = new java.sql.Time[250];
        col13 = new java.sql.Timestamp[250];
        mlbitmask = new byte[250];
        mlseqnums = new int[250];
        r = new Random(System.currentTimeMillis());
        mljobcnt = jobcnt;
        this.usingstmt = usingstmt;
        this.offset = offset;
        this.seqnum = seqnum;
        this.use_array = use_array;
    }

    public void run() {
        com.presicient.tdredux.PreparedStatement mlstmt = null;
//
//    create our default job control bitmask
//    if you need to emulate the conditional APPLY
//    (ie, APPLY label WHERE...), only set the bit
//    for the job when the condition is true
//
        for (int i = 0; i < 250; i++) {
            for (int j=0; j < mljobcnt; j++)
                mlbitmask[i] += jobmasks[j];
        }

        try {
//
//    prepare the USING clause to define the record layout
//
            mlstmt = (com.presicient.tdredux.PreparedStatement)
                    conn.prepareStatement(usingstmt + " INSERT INTO ");
//
//    establish our sequnce number and offset;
//    we set these once and then let TdRedux manage them
//    from then on
//
            mlstmt.setInt(-2, seqnum);
            mlstmt.setInt(-3, offset);
        }
        catch (SQLException SQLE) { }
        int total = 0;
        while (total < count) {
            try {
                collect_recs_each(total+startat, 250, mljobcnt);
                total += 250;
                if (use_array) {
//
//    compute our running sequence numbers
//    the offset will be the number of multiload
//    threads, so our sequence numbers are interleaved
//    between the threads
//
                    mlstmt.tdrdxBindIntArray(1,col1);
                    mlstmt.tdrdxBindShortArray(2,col2);
                    mlstmt.tdrdxBindByteArray(3,col3);
                    mlstmt.tdrdxBindStringArray(4,col4);
                    mlstmt.tdrdxBindStringArray(5,col5);
                    mlstmt.tdrdxBindDoubleArray(6,col6);
                    mlstmt.tdrdxBindBigDecimalArray(7,col7);
                    mlstmt.tdrdxBindBigDecimalArray(8,col8);
                    mlstmt.tdrdxBindBigDecimalArray(9,col9);
                    mlstmt.tdrdxBindBigDecimalArray(10,col10);
                    mlstmt.tdrdxBindDateArray(11,col11);
                    mlstmt.tdrdxBindTimeArray(12,col12);
                    mlstmt.tdrdxBindTimestampArray(13,col13);
//
//    add the sequence number and job mask as "magic" parms
//
                    mlstmt.tdrdxBindByteArray(-1, mlbitmask);
                    mlstmt.executeUpdate();
                }
                else {
                    for (int j = 0; j < 250; j++) {
                        mlstmt.setInt(1,col1[j]);
                        mlstmt.setShort(2,col2[j]);
                        mlstmt.setByte(3,col3[j]);
                        mlstmt.setString(4,col4[j]);
                        mlstmt.setString(5,col5[j]);
                        mlstmt.setDouble(6,col6[j]);
                        mlstmt.setBigDecimal(7,col7[j]);
                        mlstmt.setBigDecimal(8,col8[j]);
                        mlstmt.setBigDecimal(9,col9[j]);
                        mlstmt.setBigDecimal(10,col10[j]);
                        mlstmt.setDate(11,col11[j]);
                        mlstmt.setTime(12,col12[j]);
                        mlstmt.setTimestamp(13,col13[j]);
//
//    add the sequence number and job mask as "magic" parms
//
                        mlstmt.setByte(-1, mlbitmask[j]);
                        seqnum += offset;
                        mlstmt.executeUpdate();
                    }
                }
                conn.commit();
            }
            catch (SQLException SQLE) {
                errcount = 1;
                errmsg = new String(SQLE.getMessage());
                try {
                    for (int i = 0; i < mljobcnt; i++)
                        mlstmt.close();
                }
                catch (SQLException E) { }
                return;
            }
            System.out.println(this.getName() + " loaded " + total + " rows...");
        } //end while
        try {
            mlstmt.close();
        }
        catch (SQLException SQLE) { }
    }

    public String getResult()
    {
        if (errcount == 0)
            return new String("OK. Loaded " + count + " rows.");
        return new String("ERROR: " + errmsg);
    }

    private int abs(int v) { return (v < 0) ? v * -1 : v; }
//
//    some support methods to generate load data
//
    private String rndstring(int len)
    {
        char s[] = new char[len];
        for (int j = 0; j < len; j++)
            s[j] = alphas.charAt(abs(r.nextInt())%(alphas.length()));
        return new String(s);
    }

    private void collect_recs_each(int base, int count, int jobcnt)
    {
//
// col1 integer
// col2 smallint
// col3 byteint
// col4 char(20)
// col5 varchar(40)
// col6 float
// col7 decimal(2,1)
// col8 decimal(4,2)
// col9 decimal(8,4)
// col10 decimal(14,5)
//
        int bufsize = 0;
        int pad = 14 + 2 + 1 + 2 + 4 + 20 + 2 + 8 + 1 + 2 + 4 + 8 + 4 + 10 + 23;
        for (int i = 0; (i < count) && (bufsize < 63000); i++, base++) {
            col1[i] = base;
            col2[i] = (short)(base%32767);
            col3[i] = (byte)(base%255 - 128);
            col4[i] = new String(rndstring(20));
            col5[i] = (base%20 == 0) ? null : new String(rndstring((abs(r.nextInt())%40)+1));
            col6[i] = r.nextDouble();
            long l = r.nextLong();
            col7[i] = new BigDecimal(l%100 * 0.1);
            col7[i] = col7[i].setScale(1, BigDecimal.ROUND_HALF_EVEN);
            col8[i] = new BigDecimal(l%10000 * 0.01);
            col8[i] = col8[i].setScale(2, BigDecimal.ROUND_HALF_EVEN);
            col9[i] = new BigDecimal(l%100000000 * 0.0001);
            col9[i] = col9[i].setScale(4, BigDecimal.ROUND_HALF_EVEN);
            col10[i] = new BigDecimal(l%100000000000L * 0.00001);
            col10[i] = col10[i].setScale(5, BigDecimal.ROUND_HALF_EVEN);
            col11[i] = (base%20 == 0) ? null : new java.sql.Date(System.currentTimeMillis());
            col12[i] = (base%20 == 0) ? null : new java.sql.Time(System.currentTimeMillis());
            col13[i] = (base%20 == 0) ? null : new java.sql.Timestamp(System.currentTimeMillis());
            bufsize += pad + ((col5[i] != null) ? col5[i].length() : 0);
        }
    }
}