Using Fastload with DBD::Teradata

DBD::Teradata 2.2.0 provides improved support for Fastload. Using Fastload with DBD::Teradata requires some special considerations.

DBD::Teradata does not support recovery logging of utilities; i.e., you cannot PAUSE and then restart DBD::Teradata-based utility applications.

The attributes hash provided to tdat_UtilitySetup includes the following keys:

AttributeRequired ?Description
Utility Yes set to 'FASTLOAD'
SQL Yes The complete SQL INSERT request to be used for the fastload, including the USING clause.
Source Yes can be set to either a subroutine reference, a file description, or a connection handle to be used as the control session of a fastexport job for Loopback (see below) which will act as the source of the fastload data.

A file description is specified as

< VARTEXT 'c' | INDICDATA | DATA > filename

where 'c' is the separator character. Use '|' for compatibility with the default Teradata VARTEXT file format. INDICDATA indicates a fastload formatted file with indicator bytes, and DATA indicates a fastload formatted file without indicator bytes.

In addition, when binding rows, the subroutine is responsible for keeping track of the current size of the output buffer. Assuming the final size of the record can be computed, an additional 4 bytes per record is added by the driver for Fastload control information,

Report No a subroutine reference which is called with a status message as the Fastload progresses.
LogTables No an arrayref specifying the errortables to be used for the fastload. Default is the name of each table specified in the SQL requests, suffixed with '_err1' and '_err2'. (Tablenames longer than 27 characters are truncated prior to appending the suffix).

Note that empty errortables are automatically dropped when the fastload completes.

Checkpoint No the number of records processed between each checkpoint
Sessions Yes the number of fastload sessions to use
CheckpointCallback No a subroutine reference that is called each time a checkpoint is taken
Context No a hashref that can contain anything the application desires; it will be passed to the Source subroutine reference each time records are to be collected
MP No when set to a nonzero numeric value, or the string 'nothreads', causes multiple processes to be fork()'d, 1 per fastload session; when set to 'threads', causes multiple threads to be spawned, 1 per fastload session. This attribute may provide performance improvement on certain platforms (esp. multiprocessor platforms).
Loopback No when set to a SQL SELECT statement, causes a fastexport job to be created from which data will be taken and supplied to the fastload job. Note that the MP attribute must be specified as well. Each fork()'d process gets both a fastexport and a fastload session, and data is transferred directly from the fastexport session to the fastload session.
ErrorLimit No specifies the maximum number of errors to be permitted during the acquisition phase before the fastload is terminated. Default is 1,000,000 errors.
RequestSize No integer size in bytes; sets the maximum request buffer size to use for the utility sessions. Larger values permit more tuples to be sent to the DBMS in a single message. Default is 64256; maximum value is 1,048,000. Setting to a value outside that range, or for pre-V2R6.0 Teradata servers, will be silently ignored.
Retry No either a scalar or arrayref; if scalar, indicates the number of seconds to wait between retrying to start the fastload in the event the prior attempt failed due to no remaining utility job slots available on the database. If an arrayref, the first element is the number of seconds between retries, and the second element is the maximum number of retry attempts. The scalar form will retry indefinitely.

Examples

Multijob MP Fastload with subroutine source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'FASTLOAD',
        Sessions => $sesscount,
        SQL => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))
                 INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                       :col7, :col8, :col9, :col10, :col11, :col12, :col13);',
        Checkpoint => 20000,
        LogTables => [ err1_alltypetst, err2_alltypetst ],
        Report => \&report_cb,
        Source => \&get_data,
        CheckpointCallback => \&checkpoint,
        Context => {
            Count => \$count,
            Runtime => \$mlstarted,
            Base => [ 0, 1000000 ],
            },
        MP => 1,
        Retry => [120, 3]	# retry every 2 minutes up to 3 times
        });

    print $ctldbh->errstr
        unless ($total && ($total > 0));

sub checkpoint {
    my ($function, $rowcount, $ctxt) = @_;

    my $flstarted = $ctxt->{Runtime};
    $$flstarted = time,
    print "$rowcount FASTLOAD sessions logged on.\n" and
    return 1
    if ($function eq 'INIT');

    $$flstarted  = time - $$flstarted,
    print "$rowcount rows loaded.\n" and
    return 1
    if ($function eq 'FINISH');

    print "Check point at $rowcount rows\n" and
    return 1
    if ($function eq 'CHECKPOINT');
    1;
}

sub get_data {
    my ($function, $sth, $sessnum, $maxrows, $ctxt) = @_;
    my ($ary, $rc, $rowcnt);

    $sth->{tdat_raw} = $ctxt->{Mode}, return -1
        if ($function eq 'INIT');

    return 0 if (($function eq 'MOREDATA') && (${$ctxt->{Count}} >= 10000));

    if ($function eq 'MOREDATA') {
        $ary = collect_recs_each($ctxt->{Count},
            ($maxrows > 280) ? 280 : $maxrows, $ctxt->{JobCount}, $ctxt->{Base}->[$sessnum]);
        print "Sending " . ($#{$$ary[0]}+1) . " rows for session $sessnum...\n";
        foreach my $i (0..$#$ary) {
            $rc = $sth->bind_param_array($i+1, $$ary[$i]);
        }

        return $#{$$ary[0]}+1;
    }

    print "Got IO Finish for $sessnum\n" and return -1
        if ($function eq 'FINISH');

    print "Got CHECKPOINT for $sessnum\n" and return -1
        if ($function eq 'CHECKPOINT');

    return 0;
}

sub report_cb {
    my ($msg) = @_;
    print $msg, "\n";
}


Multi-threaded MP Fastload with INDICDATA source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'FASTLOAD',
        Sessions => $sesscount,
        SQL => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))
                 INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                       :col7, :col8, :col9, :col10, :col11, :col12, :col13);',
        Checkpoint => 20000,
        Report => \&report_cb,
        Source => 'INDICDATA rawdata.dat',
        CheckpointCallback => \&checkpoint,
        MP => 'threads'
        });

    print $ctldbh->errstr
        unless ($total && ($total > 0));

Multithreaded MP Fastload with VARTEXT source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'FASTLOAD',
        Sessions => $sesscount,
        SQL => 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
            col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
            col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
            col12 varchar(15), col13 varchar(19))
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',
        Checkpoint => 20000,
        Report => \&report_cb,
        Source => "VARTEXT '|' vardata.txt",
        CheckpointCallback => \&checkpoint,
        MP => 'threads'
        });

    print $ctldbh->errstr
        unless ($total && ($total > 0));


Multijob MP Fastload with Loopback source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });
#
#	create EXPORT control session
#
    my $fedbh = DBI->connect("dbi:Teradata:otherdbc", $userid, $passwd,
    { PrintError => 0, RaiseError => 0, AutoCommit => 1, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'FASTLOAD',
        Sessions => $sesscount,
        SQL => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))
                 INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                       :col7, :col8, :col9, :col10, :col11, :col12, :col13);',
        Checkpoint => 20000,
        Loopback => 'SELECT * FROM sometable',
        Source => $fedbh,
        Report => \&report_cb,
        CheckpointCallback => \&checkpoint,
        MP => 1
        });


    print $ctldbh->errstr
        unless ($total && ($total > 0));