Using Multiload with DBD::Teradata

DBD::Teradata 2.2.0 provides support for IMPORT multiload (DELETE multiload will be incorporated in a future release). Using Multiload with DBD::Teradata requires some special considerations.

DBD::Teradata does not support recovery logging of utilities; i.e., you cannot PAUSE and then restart DBD::Teradata-based utility applications. If DBD::Teradata detects a failure during a multiload operation, it will automatically submit a RELEASE MLOAD request to attempt to clean up.

The attributes hash provided to tdat_UtilitySetup includes the following keys:

AttributeRequired ?Description
Utility Yes set to 'MLOAD'
SQL Yes set to an arrayref of multistatement SQL requests.

Each multistatment request is considered a separate Multiload job, as per multiple .DML statements in a Multiload script. The SQL specified includes the DML extension statements defined for the MLOAD utility, namely:

  • < MARK | IGNORE > DUPLICATE [ < INSERT | UPDATE > ] ROWS
  • < MARK | IGNORE > MISSING [ < UPDATE | DELETE > ] ROWS
  • DO INSERT FOR [ MISSING UPDATE ] ROWS

These statement should precede the actual INSERT/UPDATE/DELETE statements. If none of these statements are specified the default behavior is "MARK DUPLICATE ROWS; MARK MISSING ROWS;"

Source Yes can be set to either a subroutine reference, a file description, or a connection handle to be used as the control session of a fastexport job for Loopback (see below) which will act as the source of the multiload data.

A file description is specified as

< VARTEXT 'c' | INDICDATA | DATA > filename

where 'c' is the separator character. Use '|' for compatibility with the default Teradata VARTEXT file format. INDICDATA indicates a fastload formatted file with indicator bytes, and DATA indicates a fastload formatted file without indicator bytes.

If a subroutine reference is specified, the specified subroutine is responsible for providing a multiload bitmask when binding parameter data. The bitmask provides a means for an application to implement the conditional APPLY supported in Multiload scripts (i.e., "APPLY dmllabel WHERE condition"). The bitmask is supplied to the statement handle using the tdat_mlmask attribute, which is set to either a scalar value, or an arrayref of bitmasks (for array binding). When multiple multiload jobs are defined (i.e., more than 1 request is specified with the SQL attribute), each job receives its own copy of each record. This bitmask indicates which multiload jobs are to receive a copy of the associated record. If bit 0 is set, then the first job receives a copy of the associated record; if bit 1 is set, the second job receives a copy of the record.

  • If a scalar value is supplied with tdat_mlmask, and array binding is used, then the scalar is applied to all the bound records.
  • if no tdat_mlmask value is provided, the default is all jobs get a copy of all records
  • if an arrayref supplied with tdat_mlmask is shorter than the array(s) used to bind records, a copy of the additional records will be supplied to each job.
  • for file or Loopback sources, all jobs get copies of all records.

In addition, when binding rows, the subroutine is responsible for keeping track of the current size of the output buffer. Assuming the final size of the record can be computed, an additional 14 bytes per record is added by the driver for Multiload control information, and the size in the buffer will be multiplied by the number of multiload jobs to which the record is applied. In summary, each record will add the following number of bytes to the output buffer:

total_bytes = (record_length + 14 + indicator_byte_count) * number_of_jobs

Note that the subroutine supplies each record only once; DBD::Teradata will perform the replication of each record, based on the bitmask described above.

SourceFields Yes a USING clause string that describes the source record format
Report No a subroutine reference which is called with a status message as the Multiload progresses.
LogTables No a hashref that maps the tables to be multiloaded to their work, error, and duplicate tables. The tablenames are the keys, and the values are arrayrefs with the names of the work/error/duplicate tables (see examples below). Default is the name of each table specified in the SQL requests, prefixed with WT_, ET_, and UV_ for the work,error, and duplicate table, respectively (Tablenames longer than 27 characters are truncated).

Note that the worktables are automatically dropped when the multiload completes. Also, empty error and duplicate tables are also dropped.

Checkpoint No the number of records processed between each checkpoint
Sessions Yes the number of multiload sessions to use
CheckpointCallback No a subroutine reference that is called each time a checkpoint is taken
Context No a hashref that can contain anything the application desires; it will be passed to the Source subroutine reference each time records are to be collected
MP No when set to a nonzero numeric value, or the string 'nothreads', causes multiple processes to be fork()'d, 1 per multiload session. When set to the string 'threads', causes multiple threads to be fork()'d, 1 per multiload session. This attribute may provide performance improvement on certain platforms (esp. multiprocessor platforms).
Loopback No when set to a SQL SELECT statement, causes a fastexport job to be created from which data will be taken and supplied to the multiload job. Note that the MP attribute must be specified as well. Each fork' process gets both a fastexport and a multiload session, and data is transferred directly from the fastexport session to the multiload session.
ErrorLimit No specifies the maximum number of errors to be permitted during the acquisition phase before the multiload is terminated. Default is 1,000,000 errors.
ActivityCounts No specifies a hashref to receive the Insert/Update/Delete activity counts for each MLOAD'ed table. The hashref will be keyed by the table names, and each table's entry wili be a hashref with the keys "Inserts", "Updates", and "Deletes", and each value is the total rowcount applied to each activity for the given table (see example below)
RequestSize No integer size in bytes; sets the maximum request buffer size to use for the utility sessions. Larger values permit more tuples to be sent to the DBMS in a single message. Default is 64256; maximum value is 1,048,000. Setting to a value outside that range, or for pre-V2R6.0 Teradata servers, will be silently ignored.
Retry No either a scalar or arrayref; if scalar, indicates the number of seconds to wait between retrying to start the multiload in the event the prior attempt failed due to no remaining utility job slots available on the database. If an arrayref, the first element is the number of seconds between retries, and the second element is the maximum number of retry attempts. The scalar form will retry indefinitely.

Examples

Multijob MP Multload with subroutine source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my %act_counts = ();	# to receive each table's activity counts
    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'MLOAD',
        Sessions => $sesscount,
        SQL => [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))',
        Checkpoint => 20000,
        LogTables => {
            alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report => \&report_cb,
        Source => \&ml_get_data,
        CheckpointCallback => \&mlcheckpoint,
        Context => {
            Count => \$count,
            Runtime => \$mlstarted,
            Base => [ 0, 1000000 ],
            JobCount => 3
            },
        ActivityCounts => \%act_counts,
        MP => 1,
        Retry => [ 120, 3] # retry every 2 minutes up to 3 times
        });
#
#	display our activity counts
#
    if ($total && ($total > 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh->errstr
        unless ($total && ($total > 0));

sub mlcheckpoint {
    my ($function, $rowcount, $ctxt) = @_;

    my $flstarted = $ctxt->{Runtime};
    $$flstarted = time,
    print "$rowcount MLOAD sessions logged on.\n" and
    return 1
    if ($function eq 'INIT');

    $$flstarted  = time - $$flstarted,
    print "$rowcount rows loaded.\n" and
    return 1
    if ($function eq 'FINISH');

    print "Check point at $rowcount rows\n" and
    return 1
    if ($function eq 'CHECKPOINT');
    1;
}

sub ml_get_data {
    my ($function, $sth, $sessnum, $maxrows, $ctxt) = @_;
    my ($ary, $rc, $rowcnt);

    $sth->{tdat_raw} = $ctxt->{Mode}, return -1
        if ($function eq 'INIT');

    return 0 if (($function eq 'MOREDATA') && (${$ctxt->{Count}} >= 10000));

    if ($function eq 'MOREDATA') {
        $ary = collect_recs_each($ctxt->{Count},
            ($maxrows > 280) ? 280 : $maxrows, $ctxt->{JobCount}, $ctxt->{Base}->[$sessnum]);
        print "Sending " . ($#{$$ary[0]}+1) . " rows for session $sessnum...\n";
        foreach my $i (0..$#$ary) {
            $rc = $sth->bind_param_array($i+1, $$ary[$i]);
        }

        return $#{$$ary[0]}+1;
    }

    print "Got IO Finish for $sessnum\n" and return -1
        if ($function eq 'FINISH');

    print "Got CHECKPOINT for $sessnum\n" and return -1
        if ($function eq 'CHECKPOINT');

    return 0;
}

sub report_cb {
    my ($msg) = @_;
    print $msg, "\n";
}


Multithreaded MP Multload with INDICDATA source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'MLOAD',
        Sessions => $sesscount,
        SQL => [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields => 'USING (col1 integer, col2 smallint, col3 byteint, col4 char(20),
                        col5 varchar(100), col6 float, col7 decimal(2,1),
                        col8 decimal(4,2), col9 decimal(8,4), col10 float, col11 date,
                        col12 time, col13 timestamp(0))',
        Checkpoint => 20000,
        LogTables => {
            alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report => \&report_cb,
        Source => 'INDICDATA rawdata.dat',
        CheckpointCallback => \&mlcheckpoint,
        ActivityCounts => \%act_counts,
        MP => 'threads'
        });
#
#	display our activity counts
#
    if ($total && ($total > 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh->errstr
        unless ($total && ($total > 0));

Multijob MP Multload with VARTEXT source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'MLOAD',
        Sessions => $sesscount,
        SQL => [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields => 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
            col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
            col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
            col12 varchar(15), col13 varchar(19))',
        Checkpoint => 20000,
        LogTables => {
            alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Report => \&report_cb,
        Source => "VARTEXT '|' vardata.txt",
        CheckpointCallback => \&mlcheckpoint,
        ActivityCounts => \%act_counts,
        MP => 1
        });
#
#	display our activity counts
#
    if ($total && ($total > 0)) {
    print '
Table    Inserts    Updates    Deletes
----------------------------------------
';
        print $act_counts{$_}, "\t",
        	$act_counts{$_}->{Inserts}, "\t",
        	$act_counts{$_}->{Updatess}, "\t",
        	$act_counts{$_}->{Deletes}, "\n"
        	foreach (keys %act_counts);
    }

    print $ctldbh->errstr
        unless ($total && ($total > 0));


Multijob MP Multload with Loopback source


    my $ctldbh = DBI->connect('dbi:Teradata:dbc', $username, $password,
        { RaiseError => 0, PrintError => 0, tdat_lsn => 0 });

    my $fedbh = DBI->connect("dbi:Teradata:otherdbc", $userid, $passwd,
    { PrintError => 0, RaiseError => 0, AutoCommit => 1, tdat_lsn => 0 });

    my $total = $ctldbh->tdat_UtilitySetup(
        {
        Utility => 'MLOAD',
        Sessions => $sesscount,
        SQL => [
            'DO INSERT FOR MISSING UPDATE ROWS;
            UPDATE alltypetst SET col4 = :col4 WHERE col1 = :col1;
            INSERT INTO alltypetst VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, :col12, :col13);',

            "MARK DUPLICATE INSERT ROWS;
            INSERT INTO alltypetst2 VALUES(:col1, :col2, :col3, :col4, :col5, :col6,
                :col7, :col8, :col9, :col10, :col11, TIME '10:20:30', :col13);",

            'MARK MISSING DELETE ROWS;
            DELETE FROM alltypetst3 WHERE col1 = :col1;'
        ],
        SourceFields => 'USING (col1 varchar(9), col2 varchar(6), col3 varchar(4),
            col4 varchar(20), col5 varchar(100), col6 varchar(30), col7 varchar(4),
            col8 varchar(7), col9 varchar(10), col10 varchar(30), col11 varchar(10),
            col12 varchar(15), col13 varchar(19))',
        Checkpoint => 20000,
        LogTables => {
            alltypetst => [ wt_alltt, et_alltt, uv_alltt ],
            alltypetst2 => [ wt_alltt2, et_alltt2, uv_alltt2 ],
            alltypetst3 => [ wt_alltt3, et_alltt3, uv_alltt3 ],
            },
        Loopback => 'SELECT * FROM sometable',
        Source => $fedbh,
        Report => \&report_cb,
        CheckpointCallback => \&mlcheckpoint,
        MP => 1
        });

    print $ctldbh->errstr
        unless ($total && ($total > 0));