Introduction

When running batch data imports with Ansible, a common pattern is to read data from CSV, JSON, or API sources and insert records into a database. If the playbook fails midway through the batch and Ansible's retry mechanism triggers a re-run, or if the task itself has retries: configured, the same records get inserted multiple times. The duplicates occur because the insert operation is not idempotent - running it twice creates two copies of the data.

This issue is particularly problematic in Tower/AWX where job retries are automatic, or in environments with unstable network connections where connection timeouts trigger retries. The batch may appear to fail, but partial records were already committed, and the retry adds duplicates rather than resuming from the failure point.

Symptoms

The playbook runs twice due to a failure and creates duplicate records:

yaml
# playbook.yml
- name: Import users from CSV
  community.postgresql.postgresql_query:
    db: appdb
    query: "INSERT INTO users (username, email, created_at) VALUES (%s, %s, NOW())"
    positional_args:
      - "{{ item.username }}"
      - "{{ item.email }}"
  loop: "{{ users }}"
  retries: 3
  delay: 10

Database shows duplicates after retry:

```sql SELECT username, COUNT(*) FROM users GROUP BY username HAVING COUNT(*) > 1;

username | count ----------+------- jsmith | 2 mjones | 2 awilson | 2 (3 rows) ```

Tower job log shows the retry pattern:

bash
TASK [Import users from CSV] ***************************************************
failed: [localhost] (item={'username': 'jsmith', 'email': 'jsmith@example.com'}) => {"msg": "Connection timed out"}
... (retrying in 10 seconds)
failed: [localhost] (item={'username': 'jsmith', 'email': 'jsmith@example.com'}) => {"msg": "Connection timed out"}
... (retrying in 10 seconds)
ok: [localhost] => (item={'username': 'jsmith', 'email': 'jsmith@example.com'})
ok: [localhost] => (item={'username': 'mjones', 'email': 'mjones@example.com'})
ok: [localhost] => (item={'username': 'awilson', 'email': 'awilson@example.com'})

Query log shows duplicate inserts:

bash
2026-04-11 14:14:22 INSERT INTO users (username, email, created_at) VALUES ('jsmith', 'jsmith@example.com', NOW());
2026-04-11 14:14:35 INSERT INTO users (username, email, created_at) VALUES ('jsmith', 'jsmith@example.com', NOW());
2026-04-11 14:14:35 INSERT INTO users (username, email, created_at) VALUES ('mjones', 'mjones@example.com', NOW());
2026-04-11 14:14:35 INSERT INTO users (username, email, created_at) VALUES ('awilson', 'awilson@example.com', NOW());

Common Causes

1. Non-Idempotent INSERT Statements

Standard INSERT statements always create new rows, even if the same data already exists:

sql
-- This will create duplicates if run twice
INSERT INTO users (username, email) VALUES ('jsmith', 'j@example.com');
INSERT INTO users (username, email) VALUES ('jsmith', 'j@example.com');
-- Two rows created

2. Transaction Not Encompassing the Batch

Each loop iteration commits independently. A failure at iteration 50 leaves records 1-49 committed:

yaml
- name: Insert records
  postgresql_query:
    query: "INSERT INTO table VALUES (...)"
    # Each iteration is its own transaction
  loop: "{{ items }}"

3. Missing Unique Constraints

Without a unique constraint on the natural key, the database allows duplicates:

sql
-- No constraint prevents duplicates
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50),
    email VARCHAR(100)
    -- No UNIQUE constraint on username or email!
);

4. Auto-Replay by Ansible Tower

Tower automatically retries failed jobs, which re-runs the entire playbook from the start:

yaml
# Tower job template configuration
VERBOSITY: 1
JOB_SLICE_COUNT: 1
TIMEOUT: 3600
# Job retry on failure

5. Task-Level Retries Don't Track State

The retries: directive at task level doesn't know which iterations succeeded:

yaml
- name: Batch insert
  command: psql -c "INSERT..."
  retries: 3
  # Retries the whole task, not just failed items

Step-by-Step Fix

Step 1: Add Unique Constraints and Use Upsert

Add database constraints to prevent duplicates at the schema level:

```sql -- Add unique constraint on natural key ALTER TABLE users ADD CONSTRAINT users_username_unique UNIQUE (username);

-- Or a composite unique constraint ALTER TABLE users ADD CONSTRAINT users_username_email_unique UNIQUE (username, email); ```

Modify your playbook to use upsert (INSERT ... ON CONFLICT):

yaml
- name: Import users with upsert
  community.postgresql.postgresql_query:
    db: appdb
    query: >
      INSERT INTO users (username, email, created_at)
      VALUES (%s, %s, NOW())
      ON CONFLICT (username) DO UPDATE SET
        email = EXCLUDED.email,
        updated_at = NOW()
    positional_args:
      - "{{ item.username }}"
      - "{{ item.email }}"
  loop: "{{ users }}"

Step 2: Use Transaction Block for Batch Atomicity

Wrap the entire batch in a single transaction so failures roll back everything:

```yaml - name: Import users atomically block: - name: Start transaction community.postgresql.postgresql_query: db: appdb query: "BEGIN"

  • name: Import users
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • INSERT INTO users (username, email, created_at)
  • VALUES (%s, %s, NOW())
  • ON CONFLICT (username) DO NOTHING
  • positional_args:
  • - "{{ item.username }}"
  • - "{{ item.email }}"
  • loop: "{{ users }}"
  • name: Commit transaction
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: "COMMIT"

rescue: - name: Rollback on failure community.postgresql.postgresql_query: db: appdb query: "ROLLBACK"

  • name: Report failure
  • debug:
  • msg: "Batch import failed, all changes rolled back"
  • `

Step 3: Track Import Progress for Resume Capability

Implement a tracking mechanism to skip already-imported records:

```yaml - name: Batch import with resume capability hosts: localhost vars: import_table: users source_file: users.csv batch_size: 100

tasks: - name: Read source data community.general.read_csv: path: "{{ source_file }}" register: source_data

  • name: Get already imported records
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: "SELECT username FROM {{ import_table }}"
  • register: existing_records
  • name: Filter out already imported records
  • set_fact:
  • new_records: >-
  • {{ source_data.list | selectattr('username', 'not in', existing_records.query_result | map(attribute='username') | list) | list }}
  • name: Display import summary
  • debug:
  • msg:
  • - "Total records in source: {{ source_data.list | length }}"
  • - "Already imported: {{ existing_records.query_result | length }}"
  • - "New records to import: {{ new_records | length }}"
  • name: Import new records in batches
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • INSERT INTO {{ import_table }} (username, email, created_at)
  • VALUES %s
  • ON CONFLICT (username) DO NOTHING
  • positional_args:
  • - "{{ batch_items }}"
  • vars:
  • batch_items: "{{ new_records[i:i + batch_size] }}"
  • loop: "{{ range(0, new_records | length, batch_size) | list }}"
  • loop_control:
  • index_var: i
  • label: "Batch {{ (i // batch_size) + 1 }}/{{ (new_records | length / batch_size) | round(0, 'ceil') | int }}"
  • `

Step 4: Use COPY for Large Batch Performance

For large datasets, use PostgreSQL COPY which is faster and supports ON CONFLICT:

```yaml - name: Import large CSV via COPY hosts: localhost vars: csv_file: /tmp/users_import.csv table_name: users

tasks: - name: Create temporary staging table community.postgresql.postgresql_query: db: appdb query: > CREATE TEMP TABLE {{ table_name }}_staging (LIKE {{ table_name }} INCLUDING ALL)

  • name: Copy data to staging table
  • community.postgresql.postgresql_copy:
  • db: appdb
  • table: "{{ table_name }}_staging"
  • src: "{{ csv_file }}"
  • columns:
  • - username
  • - email
  • name: Merge staging into main table
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • INSERT INTO {{ table_name }} (username, email, created_at)
  • SELECT username, email, NOW()
  • FROM {{ table_name }}_staging
  • ON CONFLICT (username) DO UPDATE SET
  • email = EXCLUDED.email,
  • updated_at = NOW()
  • name: Drop staging table
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: "DROP TABLE {{ table_name }}_staging"
  • `

Step 5: Implement Idempotent Import Playbook

Create a complete idempotent import playbook:

```yaml # import_users.yml - Idempotent user import - name: Import users from CSV idempotently hosts: localhost vars: csv_path: "{{ csv_file | default('users.csv') }}" table: users unique_column: username

tasks: - name: Validate CSV exists stat: path: "{{ csv_path }}" register: csv_stat

  • name: Fail if CSV missing
  • fail:
  • msg: "CSV file not found: {{ csv_path }}"
  • when: not csv_stat.stat.exists
  • name: Read CSV file
  • community.general.read_csv:
  • path: "{{ csv_path }}"
  • delimiter: ','
  • register: csv_data
  • name: Create import batch record
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • INSERT INTO import_batches (source_file, record_count, status, started_at)
  • VALUES (%s, %s, 'running', NOW())
  • RETURNING id
  • positional_args:
  • - "{{ csv_path }}"
  • - "{{ csv_data.list | length }}"
  • register: batch_record
  • name: Process records idempotently
  • block:
  • - name: Upsert records
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • INSERT INTO {{ table }} (username, email, full_name, created_at)
  • VALUES %s
  • ON CONFLICT ({{ unique_column }}) DO UPDATE SET
  • email = EXCLUDED.email,
  • full_name = EXCLUDED.full_name,
  • updated_at = NOW()
  • positional_args:
  • - >-
  • {%- set values = [] -%}
  • {%- for record in csv_data.list -%}
  • {%- set _ = values.append([record.username, record.email, record.full_name]) -%}
  • {%- endfor -%}
  • {{ values }}
  • register: import_result
  • name: Mark batch complete
  • community.postgresql.postgresql_query:
  • db: appdb
  • query: >
  • UPDATE import_batches
  • SET status = 'completed', completed_at = NOW(), records_processed = %s
  • WHERE id = %s
  • positional_args:
  • - "{{ csv_data.list | length }}"
  • - "{{ batch_record.query_result[0].id }}"

rescue: - name: Mark batch failed community.postgresql.postgresql_query: db: appdb query: > UPDATE import_batches SET status = 'failed', completed_at = NOW(), error_message = %s WHERE id = %s positional_args: - "{{ ansible_failed_result.msg | default('Unknown error') }}" - "{{ batch_record.query_result[0].id }}"

  • name: Rethrow failure
  • fail:
  • msg: "Import failed: {{ ansible_failed_result.msg }}"
  • name: Report import results
  • debug:
  • msg:
  • - "Import completed successfully"
  • - "Records processed: {{ csv_data.list | length }}"
  • - "Batch ID: {{ batch_record.query_result[0].id }}"
  • `

Verification

Test idempotency by running the import multiple times:

```bash # First run ansible-playbook import_users.yml -e "csv_file=test_users.csv" # Check record count

# Second run (should not create duplicates) ansible-playbook import_users.yml -e "csv_file=test_users.csv" # Check record count - should be same

# Verify no duplicates psql -d appdb -c " SELECT username, COUNT(*) as cnt FROM users GROUP BY username HAVING COUNT(*) > 1; " # Should return 0 rows ```

Verify upsert works correctly:

```sql -- Check that updates are applied SELECT username, email, updated_at FROM users WHERE username = 'test_user';

-- Run import with changed email for test_user -- Verify email was updated and updated_at changed SELECT username, email, updated_at FROM users WHERE username = 'test_user'; ```

Check import batch tracking:

sql
SELECT id, source_file, record_count, status, started_at, completed_at
FROM import_batches
ORDER BY started_at DESC
LIMIT 5;
  • [ansible-batch-writer-commits-partial-results-before-final-validation](/articles/ansible-batch-writer-commits-partial-results-before-final-validation) - Partial commit issues
  • [ansible-dead-letter-queue-fills-because-a-poison-message-is-never-quarantined](/articles/ansible-dead-letter-queue-fills-because-a-poison-message-is-never-quarantined) - Message handling
  • [ansible-job-runner-replays-completed-work-after-lease-expiry](/articles/ansible-job-runner-replays-completed-work-after-lease-expiry) - Job replay issues
  • [WordPress troubleshooting: Ansible Artifact Download Uses an Old Mi](ansible-artifact-download-uses-an-old-mirror-after-proxy-change)
  • [WordPress troubleshooting: Ansible Audit Trail Misses Events Under ](ansible-audit-trail-misses-events-under-burst-load)
  • [WordPress troubleshooting: Ansible Background Worker Gets Stuck in ](ansible-background-worker-stuck-in-a-retry-loop)
  • [WordPress troubleshooting: Ansible Backup Completes but Restore Fai](ansible-backup-completes-but-restore-fails-checksum-validation)
  • [WordPress troubleshooting: Ansible Batch Writer Commits Partial Res](ansible-batch-writer-commits-partial-results-before-final-validation)

<script type="application/ld+json"> { "@context": "https://schema.org", "@type": "TechArticle", "headline": "WordPress troubleshooting: Ansible Batch Importer Duplicates Rows A", "description": "Learn how to fix Ansible Batch Importer Duplicates Rows After a Retry. Professional WordPress troubleshooting solutions with step-by-step guidance. WP error fix, WordPress optimization, WP security, WordPress performance.", "url": "https://www.fixwikihub.com/ansible-batch-importer-duplicates-rows-after-a-retry", "publisher": { "@type": "Organization", "name": "FixWikiHub", "url": "https://www.fixwikihub.com" }, "author": { "@type": "Person", "name": "FixWikiHub Editorial Team" }, "datePublished": "2026-02-12T03:01:00.762Z", "dateModified": "2026-02-12T03:01:00.762Z" } </script>