This Update adds requested support for self referential fields.
This is useful if you need to compute and store potentially hundreds or thousands of objects and relationships quickly. To perform the inserts it will hit the database 1 plus N/100 times per affected table and where N is the number of rows to be inserted. It will use INSERT or LOAD DATA INFILE on MySQL.
Run this on your test database first and make sure all of your field defaults and null values are set appropriately as you could attempt to insert a NULL where it isn't allowed and end up with a partial insert.
This code is reasonably well tested and has been used for database pre-loading and operations on live sites.
My test suite, however, is focused on my own use cases. Any input, i.e. failures, for creating more tests would be appreciated.
Lots of Details in the Doc String.
Currently only MySQL, however there is some crude skeleton code to support other databases.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 | # This Python file uses the following encoding: utf-8
import os, sys
import datetime
from tempfile import mkstemp
from django.conf import settings
from django.db import models, connection
from django.db.models import signals
from django.db.models.fields import AutoField, DateTimeField, DateField, TimeField, FieldDoesNotExist
from django.db.models.fields.related import ForeignKey, OneToOneField, ManyToManyField
from django.dispatch import dispatcher
def hash_dict(dictionary):
return hash(frozenset(dictionary.items()))
class BulkInsert(models.Manager):
""" BulkInsert Manager Class
contact [email protected]
Public Interface:
bulk_insert(self, now=None, raw=False, send_pre_save=True, **kwargs):
bulk_insert_commit(self, now=None, autoclobber=False, max_depth=5, send_post_save=True, **kwargs)
bulk_insert_now(self, now=None)
bulk_insert_threshold(self, threshold=1000)
Update 5-9-2008 Added support for self references for Foreign Keys,
OneToOne Fields and Many to Many relations
max_depth default changed to 5
Tested on Django SVN 7519
Currently, only MySQL is supported although a crude framework is there to support others
Bulk insert objects to a database including Foreign Key, One to One and Many to Many
relationships.
You can also create self referential relationships of each type including (non)symmetrical
Many to Many relationships.
Insert an object by calling bulk_insert(kwargs) where kwargs is what you would specify
to create an instance of the underlying model i.e. Model(kwargs).
Field names in kwargs can also include any related names declared on other models.
IMPORTANT: Internally, because no primary keys are known until the objects are inserted,
each object is identified by its "kwargs signature" which is a hash of the field names
and values given to bulk_insert combined with any pre_computed default values.
If you want to insert the _exact_ same data in a bunch of rows, this won't help you.
Objects with the same signature will be treated as if they are the same
object.
If you specify your own primary key values when the primary key is an AutoField,
your values will be ignored
#####################
# Example #
#####################
Given these three classes:
class Author(models.Model):
first_name = models.CharField(blank=True, max_length=100)
last_name = models.CharField(blank=True, max_length=100)
class Article(models.Model):
author = models.ForeignKey(Author, null=True)
text = models.CharField(blank=True, max_length=100, default='')
objects = BulkInsert()
class Note(models.Model):
article = models.ForeignKey(Article, related_name='notes')
text = models.CharField(blank=True, max_length=100)
class Character(models.Model):
char = models.CharField(blank=True, max_length=1)
words = models.ManyToManyField(Article, related_name='characters')
1. Article.objects.bulk_insert( text="this is gibberish",
author={'first_name':'John', 'last_name':'Smith}
notes=[{'text':'very long'}, {'text':'very short'}],
characters=[{'chars':'t'}, {'chars':'h'}, {'chars':'i'}, {'chars':'s'},
{'chars':' '}, {'chars':'i'}, {'chars':'s'}, {'chars':' '},
{'chars':'g'}, {'chars':'i'}, {'chars':'b'}, {'chars':'b'},
{'chars':'e'}, {'chars':'r'}, {'chars':'i'}, {'chars':'s'},
{'chars':'h'}])
2. Article.objects.bulk_insert( text="this is gibberish",
author={'first_name':'John', 'last_name':'Smith})
3. Article.objects.bulk_insert( text="this is gibberish",
author={'first_name':'John', 'last_name':'Smith},
notes=Note(text="just right"))
Article.objects.bulk_insert_commit()
On commit, the effect of the first call will be to first bulk insert an Author object and recover its primary key.
Then it will insert the Article object referencing the Author object and recover the Article's primary key
Next it will insert two notes and associate them with the Article
Finally it will bulk insert all of the characters and establish each character's many_to_many
relationship with the Article
Note that there are duplicate Character objects being added, the duplicates will all be treated as
the same object as their "kwargs signatures" are identical
Also note that the second call to bulk insert will NOT create another article object because its
kwargs signature matches the first call. Only fields stored in the Article's table are used to compute
the signature
The third call to bulk_insert will add an additional note. Notice that it's a Note object
with an unset primary key. It will be converted into a set of kwargs and bulk inserted accordingly.
If its primary key were set, the key would be extracted to set the relationships but the object would
not be re-inserted
For this example, the pre_save signal will be called *three* times for the Article object and post_save
will be called once
#####################
# Real Example #
#####################
Saving related search terms from the Yahoo.jp API in a general
WebQuery table. A maximum of 100 related terms are retrieved at a time
class WebQuery(models.Model):
query = models.CharField(blank=False, null=False, max_length=256)
query_type = models.IntegerField(blank=False, null=False, choices=QUERY_TYPES)
...
class Meta:
unique_together = ("query", "query_type")
class RelatedSearch(models.Model):
original = models.CharField(blank=True, max_length=100, unique=True)
queries = models.ManyToManyField(WebQuery, related_name='searches')
...
objects = BulkInsert()
Without bulk insert
-------------------
queries = []
r = RelatedSearch.objects.get_or_create(original=original_query)
for q in get_related_searches(original_query):
webquery = WebQuery.objects.get_or_create(query=q, query_type=YAHOOJP_RELATED)
queries.append(webquery.id)
r.queries.add(*queries)
Worst Case: 101 objects are inserted using 204 db queries
With Bulk Insert
----------------
queries = []
for q in get_related_searches(original_query):
queries.append({'query': q, 'query_type': YAHOOJP_RELATED})
RelatedSearch.objects.bulk_insert(original=original_query, queries=queries)
RelatedSearch.objects.bulk_insert_commit()
Worst Case: 101 objects are inserted with 6 db queries
#####################
# Notes #
#####################
Arguments to bulk_insert can include the underlying models fields, of course, but also
any related name specified by a related class. In other words you can treat relationships
defined on other models as "fields" on this model, similar to QuerySets.
If the object you wish to specify a relationship with doesn't exist, simply provide a
dict of arguments or a model instance with an unset primary key and it will be queued and
bulk inserted accordingly.
You don't need to add BulkInsert as a manager to each class you wish to perform bulk
inserts with - just the class you want to perform bulk inserts _relative to_.
If, when bulk inserting a list of related objects, it cannot find a BulkInsert manager,
it will add one to that class and perform the insert
To execute the actual insert call bulk_insert_commit.
This will execute the insert using an INSERT statement or by bulk loading into the
database from a temp file. A default threshold of 1000 must be met before the insert
is performed from a file.
For the file insert to work, write permissions are needed in the current working directory
or settings.BULK_INSERT_DIR
Also, the mysql user needs file priveleges i.e:
GRANT FILE ON *.* TO 'user'@'localhost';
Note that for MySQL, File permissions can only be added globally
Regardless of the number of objects and type of relationships, this will make
1 database call to perform the insert per table affected and N/100 calls per table
to recover primary keys where N is the number of rows to be inserted
Generic Foreign Keys Partially Supported - Need a fix like Ticket #5826
Currently generic foreign keys can only be set by manually specifying the content_type and object_id
Whereas on a model instance with a GFK, you can do something like:
instance.element = object
instead of:
instance.content_type = ContentType.objects.get_for_model(object.__class__)
instance.object_id = object.id
There currently isn't a way to recover from the GFK object, the field names that it points to.
So the convenient form of:
Model.objects.bulk_insert(element={'name': 'Jim'}) isn't possible
use:
Create and save Jim object and collect its id
Model.objects.bulk_insert(content_type=ctype, object_id=id)
#####################
# Gotchas #
#####################
-- data types - For MySQL, TEXT and BLOB types cannot be inserted with LOAD DATA INSERT
To insure that the file insert isn't used, specify a negative threshold for
bulk_insert_threshold()
-- pre_save signal - It's very possible that pre_save will be called more than once
for the same object when inserting a lot of objects or adding a lot of relationships
e.g. If you are programmatically generating values and bulk_insert object A. Presave
will be called. If you bulk insert A again with many_to_many relationship B. Presave
will be called again.
If this is undesirable, specify send_pre_save = False when calling bulk_insert()
--In a pre_save function, all changes to instance values will be recorded, however, be
careful with computations that produce different results based on time. If pre_save is
called three times for what is nominally the same object and three different values are
set, they will be treated as 3 different objects.
If this is undesirable, specify send_pre_save = False when calling bulk_insert()
-- DateTime, Date, and Time fields - If using datetime.now as your default value,
one value for "now" is precomputed and used for all inserts i.e. every date or time
field that uses datetime.now or auto_now or auto_now_add will have the same value
Think of them as all being inserted at the same instant
-- Default values that call a function. If you compute a dynamic default value, know
that it will be computed once and cached for all inserted objects
#####################
# Self Reference #
#####################
Self References operate the same as any other reference but there are some powerful
gotchas if you get too creative. This doesn't handle the *general* case of limitless
recursive relationships. If you start seeing warnings and KeyErrors, simplify your inserts.
Symmetrical is honored for self referencing m2m fields. The direction of relationships can be
bulk inserted by specifiying the m2m field name or the related name.
In the example below, this is done by setting 'friends' or 'followers'
Given this class loaded with self references:
class Player(models.Model):
first_name = models.CharField(blank=True, max_length=100)
last_name = models.CharField(blank=True, max_length=100)
friends = models.ManyToManyField('self', symmetrical=False, related_name='followers', null=True)
favorite = models.ForeignKey('self', related_name='fans', null=True)
antiself = models.OneToOneField('self', related_name='realme', null=True)
objects = BulkInsert()
And this set of dicts representing players and their relationships:
other_players = [{'first_name':"Babe", 'last_name':'Ruth',
'friends': [{'first_name': 'Cal', 'last_name': 'Ripkin'},
{'first_name': 'Willie', 'last_name': 'Mays'}]},
{'first_name':"Hank", 'last_name':'Aaron'}]
players = [ {'first_name':'Mark', 'last_name':'McGuire', 'followers':other_players},
'antiself': {'first_name': 'Willie', 'last_name': 'Mays'},
{'first_name':'Sammy', 'last_name':'Sosa', 'favorite':{'first_name':'Hank', 'last_name':'Aaron'}},
{'first_name':'Johnny', 'last_name':'Bench', 'antiself':{'first_name':'Hank', 'last_name':'Aaron'}},
{'first_name': 'Mickey', 'last_name': 'Mantle', 'friends': other_players}]
for player in players:
GibberishAuthor.objects.bulk_insert(**player)
GibberishAuthor.objects.bulk_insert_commit()
This is a contorted example that pushes the limits but in its unnecessary complexity, shows what can be done.
After commit, a total of *8* unique players will be added:
Babe Ruth favorite:None antiself:None friends:3 [Cal Ripkin, Willie Mays, Mark McGuire]
Cal Ripkin favorite:None antiself:None friends:0 []
Willie Mays favorite:None antiself:None friends:0 []
Hank Aaron favorite:None antiself:None friends:1 [Mark McGuire]
Mark McGuire favorite:None antiself:Willie friends:0 []
Sammy Sosa favorite:Hank antiself:None friends:0 []
Johnny Bench favorite:None antiself:Hank friends:0 []
Mickey Mantle favorite:None antiself:None friends:2 [Babe Ruth, Hank Aaron]
The first thing to note is that even though some players are referenced more than once,
their kwargs signatures (hash of all non-m2m field values) are identical.
Be careful with self referential fk and one2one relations that are nested below the first level.
Note that fk's and one2one's were set in 'players' but not in 'other_players'. Inserting fk and
one2one self references breaks the one insert per table design of this manager because fk's and one2one's
must be inserted first. To deeply nest self referential fk's and one2one's, use objects whose primary
key is already set or just provide the primary key value directly.
To do this on the nested many to many relations above:
other_players = [{'first_name':"Babe", 'last_name':'Ruth', 'friends': [
{'first_name': 'Cal', 'last_name': 'Ripkin'},
{'first_name': 'Willie', 'last_name': 'Mays'}],
-------> 'favorite': Player.objects.create(first_name='Bob', last_name='Jones'),
-------> 'antiself': 3},
{'first_name':"Hank", 'last_name':'Aaron'}]
If you can, avoid deeply nesting self references or provide model instances with primary keys.
Because the kwargs signature is the only unique identifier, you may unwittingly create two
versions of the same object or worse, it will error out on a KeyError because some distant
self reference changed the kwargs signature of the object.
You'll probably never need to do something so twisted, but if you do, those are the caveats.
"""
def __init__(self):
super(models.Manager, self).__init__()
self.queue = {}
self.order = {}
#Special Handlers for Self Referencing Objects
self.ref_queue = {}
self.ref_order = {}
self.ref_cache = {}
self.related_fields = {}
self.related_queue = {}
self.related_classes = {}
self.m2one_queue = {}
self.m2one_fields = {}
self.m2one_classes = {}
self.m2m_queue = {}
self.m2m_fields = {}
self.m2m_classes = {}
self.update_map = {}
self.defaults = {}
self.initialized = False
self.now = datetime.datetime.now()
if settings.DATABASE_ENGINE == 'mysql':
self.backend = MySQL_BulkInsert()
else:
#dummy backend - does nothing
self.backend = BulkInsertBackend()
self.threshold = 1000
def bulk_insert_now(self, now=None):
if now is not None:
self.now = now
else:
self.now = datetime.datetime.now()
#Some default values may be invalidated by changing 'now'
if self.initialized:
self._collect_field_defaults()
else:
self._related_init()
def bulk_insert_threshold(self, threshold=1000):
self.threshold = threshold
def bulk_insert(self, now=None, raw=False, clean_args=False, send_pre_save=True, _self_ref=False, **kwargs):
"""
Hold kwargs in queue until bulk_insert_commit
All field preprocessing is done unless raw=True
Returns a hash for kwargs if clean_args=True
Primarily for internal use
Sends pre_save signal unless send_pre_save=False
kwargs can include any field from the underlying model including any related_name's
specified by related models
"""
if not self.initialized:
#Initialize is delayed until bulk_insert is first called to ensure
#that all relationship hooks have been added to the underlying class
self._related_init()
self.tempModel = self.model()
if now is not None:
if now != self.now:
self.now = now
self._collect_field_defaults()
#check for valid field names
self._check_fields(kwargs=kwargs)
#Determine which related fields are present
fk_or_one2one = set(kwargs.keys()).intersection(set(self.related_fields.keys()))
many_to_one = set(kwargs.keys()).intersection(set(self.m2one_fields.keys()))
many_to_many = set(kwargs.keys()).intersection(set(self.m2m_fields.keys()))
#pop off m2m and m2one names, the tempModel can't handle them
m2m_dict = {}
for name in many_to_many:
m2m_dict[name] = kwargs.pop(name)
m2one_dict = {}
for name in many_to_one:
m2one_dict[name] = kwargs.pop(name)
#Pop off Foreign Key and OneToOne names if they need to be bulk inserted first
related = []
for name in fk_or_one2one:
if isinstance(kwargs[name], dict):
manager = self._find_bulk_manager(self.related_classes[name])
sref = self.related_classes[name] == self.model
arg_hash = manager.bulk_insert(now=self.now, clean_args=True, _self_ref=sref, **kwargs[name])
related += [(name, arg_hash)]
kwargs.pop(name)
elif isinstance(kwargs[name], self.related_classes[name]) and getattr(kwargs[name], kwargs[name]._meta.pk.attname) is None:
args = self._generate_args(kwargs[name])
manager = self._find_bulk_manager(self.related_classes[name])
sref = self.related_classes[name] == self.model
arg_hash = manager.bulk_insert(now=self.now, clean_args=True, _self_ref=sref, **args)
related += [(name, arg_hash)]
kwargs.pop(name)
#Temporary model for signal dispatch and field preprocessing
#self.tempModel = self.model()
for name in kwargs.keys():
field = self.tempModel._meta.get_field(name)
if isinstance(field, ForeignKey) and isinstance(kwargs[name], int) or isinstance(kwargs[name], long):
setattr(self.tempModel, field.attname, kwargs[name])
continue
setattr(self.tempModel, field.name, kwargs[name])
#Preprocess field data unless 'raw' specified on call
#Special handling for defaults on date and time fields to ensure
#proper formatting for primary key recovery
watch = {}
for f in self.tempModel._meta.fields:
val = getattr(self.tempModel, f.attname)
watch[f.name] = val
if isinstance(f, AutoField):
if f.name in kwargs.keys():
kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True))
elif f.name in kwargs.keys():
kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True))
else:
kwargs[f.name] = self.defaults[f.name]
#Presave could be called more than once for the same object
if send_pre_save:
dispatcher.send(signal=signals.pre_save, sender=self.tempModel.__class__, instance=self.tempModel)
#Check for changes from pre_save
for f in [field for field in self.tempModel._meta.fields if field.name in kwargs]:
if watch[f.name] != getattr(self.tempModel, f.attname):
kwargs[f.name] = f.get_db_prep_save(raw and val or f.pre_save(self.tempModel, True))
#hash to identify this arg:value set
key = hash_dict(kwargs)
#Objects with the same arg:value signature are considered
#the same object
if _self_ref:
if key not in self.ref_queue:
self.ref_queue[key] = kwargs
self.ref_order[key] = len(self.ref_queue)
elif key not in self.queue:
self.queue[key] = kwargs
self.order[key] = len(self.queue)
#With the key computed, associate it with any Fk's and one2one's
#that will be inserted later
for name, arg_hash in related:
if arg_hash not in self.related_queue[name]:
self.related_queue[name][arg_hash] = []
self.related_queue[name][arg_hash] += [key]
for name in many_to_one:
self._m2one_enqueue(name, m2one_dict[name], key)
for name in many_to_many:
self._m2m_enqueue(name, m2m_dict[name], key)
#tempModel = None
self._clear_tempModel()
if clean_args:
return key
def _clear_tempModel(self):
for field in self.tempModel._meta.fields:
val = field.get_default()
setattr(self.tempModel, field.attname, val)
def bulk_insert_commit(self, now=None, autoclobber=False, depth=0, max_depth=5, send_post_save=True, _self_ref=False, **kwargs):
"""
Bulk inserts all queued objects and relations to the database with one insert per affected table
and N/100 selects to find the primary keys where N is the number of inserted rows
If autoclobber is False, the default, the insert is performed with IGNORE. Any object that duplicates one
already in the database is not reinserted but any new relationships will be
if autoclobber is True, the insert is performed with REPLACE, clobbering any duplicates
If autoclobber is None, no checking is done and any duplicates will raise a Database Integrity Error
If kwargs is specified, its values are used to overide any model defaults
"""
if not self.queue or depth > max_depth:
return {}
self._check_fields(no_related=True, kwargs=kwargs)
many_to_many = filter(lambda x: x['list'] != [], self.m2m_queue.values()) != []
many_to_one = filter(lambda x: x != [], self.m2one_queue.values()) != []
related = filter(lambda x: x != [], self.related_queue.values()) != []
m2m_depth = filter(lambda x: x['bulk'], self.m2m_queue.values())
try:
if not _self_ref:
#Foreign Key and OneToOne are inserted first
#Their primary keys are needed to save the root objects
if related:
self._fk_one2one_insert(depth, max_depth, autoclobber)
#inserting a fk or one2one invalidates our kwargs signatures
#computing new hashes and mapping to the old hash for m2m and m2one
copy = {}
order_copy = {}
self.update_map = {}
for key, value in self.queue.items():
new_key = hash_dict(value)
self.update_map[key] = new_key
copy[new_key] = value
try:
order_copy[new_key] = self.order[key]
except KeyError:
#For nested inserts with self references, objects
#inserted across those references have no order information
order_copy[new_key] = sys.maxint
self.queue = copy
self.order = order_copy
order = self.order.items()
order.sort(lambda x,y: x[1] - y[1])
else:
#Related Self References are bunched together and inserted once
#however, commit will be called for every self referencing field
#If the commit has already been done, return the cached queue
if not self.ref_queue:
return self.ref_cache, self.update_map
order = self.ref_order.items()
order.sort(lambda x,y: x[1] - y[1])
#Saving the root objects
queue = _self_ref and self.ref_queue or self.queue
if len(queue) <= self.threshold or self.threshold < 0:
self.backend.insert(table=self.model._meta.db_table,
fields=[f for f in self.model._meta.fields if not isinstance(f, AutoField)],
queue=queue,
order = order,
autoclobber=autoclobber)
else:
self.backend.write_temp_file(fields=[f for f in self.model._meta.fields if not isinstance(f, AutoField)],
queue=queue)
self.backend.insert_from_file(table=self.model._meta.db_table,
columns=[f.column for f in self.model._meta.fields if not isinstance(f, AutoField)],
autoclobber=autoclobber)
self._recover_pks(_self_ref)
if not _self_ref:
if many_to_many:
self._many_to_many_insert(depth, max_depth, autoclobber)
if many_to_one:
self._many_to_one_insert(depth, max_depth, autoclobber)
except Exception, e:
self.reset()
raise Exception, e
#Dispatch Post Save Signals
if send_post_save:
values = _self_ref and self.ref_queue.values() or self.queue.values()
for args in values:
for name in args.keys():
setattr(self.tempModel, self.tempModel._meta.get_field(name).attname, args[name])
dispatcher.send(signal=signals.post_save, sender=self.tempModel.__class__, instance=self.tempModel)
self._clear_tempModel()
if depth > 0:
queue = dict(_self_ref and self.ref_queue or self.queue)
update_map = self.update_map
self.reset(_self_ref)
if _self_ref:
self.ref_cache = queue
return queue, update_map
self.reset()
return {}, {}
def reset(self, _self_ref=False):
"""
Close and remove any temp files
Reset all queues and field maps
"""
if _self_ref:
self.ref_queue = {}
self.ref_order = {}
self.backend.clear()
return
self.queue = {}
self.ref_queue = {}
for key in self.m2one_queue.keys():
self.m2one_queue[key] = []
for key in self.m2m_queue.keys():
self.m2m_queue[key] = {'list':[], 'bulk':False}
for key in self.related_queue.keys():
self.related_queue[key] = {}
self.update_map = {}
self.order = {}
self.ref_order = {}
self.ref_cache = {}
self.backend.clear()
###################
# PRIVATE METHODS #
###################
def _related_init(self):
"""
Find all related forward and reverse Foreign Key, OneToOne,
ManyToMany and ManyToOne relationships and cache the relevant
fields and model classes
"""
self.initialized = True
for f in self.model._meta.fields:
if isinstance(f, ForeignKey) or isinstance(f, OneToOneField):
self.related_classes[f.name] = f.rel.to
self.related_fields[f.name] = f
self.related_queue[f.name] = {}
for r in self.model._meta.get_all_related_objects():
name = r.field.rel.related_name or r.model.__name__.lower() + '_set'
self.m2one_classes[name] = r.model
self.m2one_fields[name] = r.field
self.m2one_queue[name] = []
for f in self.model._meta.many_to_many:
self.m2m_classes[f.name] = f.rel.to
self.m2m_fields[f.name] = f
self.m2m_queue[f.name] = {'list':[], 'bulk':False}
for m2m in self.model._meta.get_all_related_many_to_many_objects():
name = m2m.field.rel.related_name or m2m.model.__name__.lower() + '_set'
self.m2m_classes[name] = m2m.model
self.m2m_fields[name] = m2m.field
self.m2m_queue[name] = {'list':[], 'bulk':False}
self._collect_field_defaults()
def _collect_field_defaults(self):
"""
Collect default values for each field
"""
self.defaults = {}
scrapModel = self.model()
for f in scrapModel._meta.fields:
if isinstance(f, DateTimeField) or isinstance(f, DateField) or isinstance(f, TimeField):
if (f.default == datetime.datetime.now or f.auto_now or f.auto_now_add):
if isinstance(f, DateTimeField):
self.defaults[f.name] = self.now.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(f, DateField):
self.defaults[f.name] = self.now.strftime('%Y-%m-%d')
elif isinstance(f, TimeField):
self.defaults[f.name] = self.now.strftime('%H:%M:%S')
continue
if not isinstance(f, AutoField):
self.defaults[f.name] = scrapModel._meta.get_field(f.name).get_db_prep_save(f.pre_save(scrapModel, True))
def _check_fields(self, no_related=False, kwargs={}):
"""
Check that all fields given to bulk_insert and bulk_insert_commit are valid
"""
if no_related:
valid_fields = set([f.name for f in self.model._meta.fields])
else:
valid_fields = set([f.name for f in self.model._meta.fields] + self.m2m_fields.keys() + self.m2one_fields.keys())
invalid_fields = set(kwargs.keys()) - valid_fields
assert len(invalid_fields) == 0, \
"Invalid field(s): %s . Acceptable field values: %s . All Arguments: %s" %\
(', '.join(invalid_fields), ', '.join(valid_fields), ', '.join([str(t) for t in kwargs.items()]))
def _fk_one2one_insert(self, depth, max_depth, autoclobber):
"""
Commit any related fk or one2one objects to the database
Calls bulk_insert_commit on the related class
"""
for name in self.related_queue.keys():
if self.related_queue[name]:
manager = self._find_bulk_manager(self.related_classes[name])
self_ref = self.related_classes[name] == self.model
r_queue, update_map = manager.bulk_insert_commit(now=self.now, depth=depth+1,
max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref)
if r_queue:
pk_name = self.related_classes[name]._meta.pk.name
for arg_hash, keys in self.related_queue[name].items():
arg_hash = update_map.get(arg_hash, arg_hash)
for key in keys:
try:
self.queue[key][name] = r_queue[arg_hash][pk_name]
except KeyError, e:
if key in self.ref_cache.keys():
print >> sys.stderr , "Warning: Too many recursive self references on a Foreign Key or OneToOne field class:%s field:%s - Value NOT Saved" % (self.model, name)
else:
raise KeyError, e
def _many_to_one_insert(self, depth, max_depth, autoclobber):
"""
Delayed bulk insert and commit of many to one relations
"""
for name in self.m2one_queue.keys():
manager = self._find_bulk_manager(self.m2one_classes[name])
non_related_name = None
for f in self.m2one_classes[name]._meta.fields:
try:
if f.rel.related_name == name:
non_related_name = f.name
break
except:
pass
else:
non_related_name = name + '_set'
self_ref = self.m2one_classes[name] == self.model
for args_list, key in self.m2one_queue[name]:
key = self.update_map.get(key, key)
pk = self.queue[key][self.model._meta.pk.name]
for args in args_list:
args[non_related_name] = pk
manager.bulk_insert(now=self.now, _self_ref=self_ref, **args)
manager.bulk_insert_commit(now=self.now, depth=depth+1, max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref)
def _many_to_many_insert(self, depth, max_depth, autoclobber):
"""
Inserts all ManyToMany related objects and their relationships
"""
for name in self.m2m_queue.keys():
if self.m2m_queue[name]['bulk']:
self_ref = self.m2m_classes[name] == self.model
manager = self._find_bulk_manager(self.m2m_classes[name])
r_queue, update_map = manager.bulk_insert_commit(now=self.now, depth=depth+1, max_depth=max_depth, autoclobber=autoclobber, _self_ref=self_ref)
if r_queue:
for entry in self.m2m_queue[name]['list']:
for arg_hash in entry['bulk']:
entry['values'] += [r_queue[arg_hash][self.m2m_classes[name]._meta.pk.name]]
else:
warning = "Max recursion depth, %s, exceeded. Some relationships between %s and %s may not be defined."
sys.stderr.write(warning % (max_depth, self.model.__name__, self.m2m_classes[name].__name__))
if self.m2m_queue[name]['list']:
table = self.m2m_fields[name].m2m_db_table()
columns = [self.m2m_fields[name].m2m_column_name(), self.m2m_fields[name].m2m_reverse_name()]
#Determine the direction of the ManyToMany Relationship
#The special case of a self referential field requires further checking
if self.m2m_fields[name].rel.to == self.model:
if self.m2m_classes[name] != self.model or not filter(lambda x: x.name==name, self.model._meta.many_to_many):
columns.reverse()
#This value only matters for self referential relations
symmetrical = False
if self.m2m_classes[name] == self.model:
if getattr(self.m2m_fields[name].rel, 'symmetrical', False):
symmetrical = True
for key in self.ref_cache.keys():
self.queue[key] = self.ref_cache[key]
if len(self.m2m_queue[name]['list']) <= self.threshold or self.threshold < 0:
self.backend.insert_m2m(table, self.model._meta.pk.name,
columns, self.queue, self.m2m_queue[name],
self.update_map, autoclobber, symmetrical)
else:
self.backend.write_m2m_temp_file(self.model._meta.pk.name, self.queue, self.m2m_queue[name], self.update_map, symmetrical)
self.backend.insert_from_file(table, columns, autoclobber)
def _recover_pks(self, _self_ref=False):
"""
Store the recovered primary keys in the local queue
Recover them 100 at a time
"""
fields = [f for f in self.model._meta.fields if not isinstance(f, AutoField)]
qn = connection.ops.quote_name
cursor = connection.cursor()
if self.model._meta.pk in fields:
return #No keys to recover
recovery_fields = fields + [self.model._meta.pk]
table = self.model._meta.db_table
primary = self.model._meta.pk
pk_index = len(recovery_fields) - 1
queue = _self_ref and self.ref_queue or self.queue
recover_limit = 100
start = 0
for end in xrange(recover_limit, len(queue)+recover_limit, recover_limit):
where = []
query_data = []
for kwargs in queue.values()[start:end]:
temp = []
for f in fields:
query_data += [kwargs[f.name]]
if kwargs[f.name] is None:
temp += ['%s is %%s' % (qn(f.column))]
else:
temp += ['%s = %%s' % (qn(f.column))]
where += ['(' + ' AND '.join(temp) + ')']
where = ' OR '.join(where)
sql = "SELECT %s FROM %s WHERE " % \
(','.join(["%s.%s" % (qn(table), qn(f.column)) for f in recovery_fields]), qn(table)) + \
where + " ORDER BY %s" % qn(primary.column)
cursor.execute(sql, query_data)
rows = cursor.fetchall()
result = []
for row in rows:
temp = {}
for f, r in zip(recovery_fields[:-1], row):
if isinstance(r, datetime.datetime):
r = r.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(r, datetime.date):
r = r.strftime('%Y-%m-%d')
elif isinstance(r, datetime.time):
r = r.strftime('%H:%M:%S')
temp[f.name] = r
try:
queue[hash_dict(temp)][primary.name] = row[pk_index]
except KeyError:
pass
for q in queue.values()[start:end]:
if primary.name not in q:
raise Exception, "Integrity Error. Object %s could not be inserted" % ', '.join([unicode(k).encode('utf8') + ' : ' + unicode(v).encode('utf8') for k,v in q.items()])
start = end
def _find_bulk_manager(self, cls):
"""
Locate a bulk manager on a related class
If there isn't one, add one
"""
for attr in dir(cls):
try:
m = getattr(cls, attr)
if isinstance(m, self.__class__):
m.bulk_insert_threshold(self.threshold)
return getattr(cls, attr)
except:
pass
cls.add_to_class('_bulk_manager', self.__class__())
cls._bulk_manager.bulk_insert_threshold(self.threshold)
return cls._bulk_manager
def _m2one_enqueue(self, name, value, key):
"""
Queue for the many side of ManyToOne relationships
Can't do anything with them until the primary key for
the root side is available
"""
if not isinstance(value, list):
value = [value]
self.m2one_queue[name] += [(value, key)]
def _generate_args(self, obj):
"""
If we have been supplied a model object with no primary key,
convert it into a kwargs dictionary
"""
args = {}
for f in obj._meta.fields:
if isinstance(f, AutoField):
continue
args[f.name] = f.get_db_prep_save(f.pre_save(obj, True))
return args
def _m2m_enqueue(self, name, value, key):
"""
ManyToMany Queue
This handles dicts of args, integer ids, and model objects with or without primary keys
Or a list with any combination of the above
"""
cls = self.m2m_classes[name]
bulk = []
self_ref = cls == self.model
pk = self.model._meta.pk.name
if not isinstance(value, list):
value = [value]
new_value = []
for v in value:
if isinstance(v, dict):
if v.get(pk, None):
new_value += [v[pk]]
manager = self._find_bulk_manager(cls)
bulk += [manager.bulk_insert(now=self.now, clean_args=True, _self_ref=self_ref, **v)]
elif isinstance(v, cls):
if getattr(v, pk, None) is None:
args = self._generate_args(v)
manager = self._find_bulk_manager(cls)
bulk = [manager.bulk_insert(now=self.now, clean_args=True, _self_ref=self_ref, **args)]
else:
new_value += [getattr(v, pk)]
elif isinstance(value, (int,long)):
new_value += [long(v)]
else:
raise ValueError, "ManyToMany list argument, %s=%s, must contain numbers, dicts or instances of %s" %\
(name, value, cls.__name__)
if bulk:
self.m2m_queue[name]['bulk'] = True
self.m2m_queue[name]['list'] += [{'values':new_value, 'key':key, 'bulk':bulk}]
class BulkInsertBackend(object):
def __init__(self):
self.temp_file = None
def write_temp_file(self, fields, queue):
pass
def write_m2m_temp_file(self, primary_key_name, queue, m2m_queue, update_map, symmetrical):
pass
def insert_from_file(self, table, columns, autoclobber=False):
pass
def insert(self, table, columns, autoclobber=False):
pass
def insert_m2m(self, table, primary_key_name, columns, queue, m2m_queue, update_map, autoclobber, symmetrical):
pass
def clear(self):
try:
os.close(self.temp_file[0])
except:
pass
try:
os.remove(self.temp_file[1])
except:
pass
class MySQL_BulkInsert(BulkInsertBackend):
def write_temp_file(self, fields, queue):
"""
Creates and writes input file for LOAD DATA INSERT
Used everywhere except for saving ManyToMany relationships
"""
#Tab delimited fields, newlines mark rows
self.temp_file = mkstemp(suffix='.import', prefix='bulk',
dir=getattr(settings, 'BULK_INSERT_DIR', '.'))
assert self.temp_file is not None
fh, path = self.temp_file
line = u'\t'.join(["%s"] * len(fields))
for kwargs in queue.values():
values = []
for f in fields:
val = kwargs[f.name] is None and u'\\N' or unicode(kwargs[f.name]).replace(u'\t', u'\\t').replace(u'\n', u'\\n')
values.append(val)
out = line % tuple(values) + u'\n'
os.write(fh, out.encode('utf8'))
os.close(fh)
os.chmod(path, 0100644)
def write_m2m_temp_file(self, primary_key_name, queue, m2m_queue, update_map, symmetrical):
"""
Creates and writes ManyToMany relationship input file for LOAD DATA INSERT
"""
#Tab delimited fields, newlines mark rows
#Write to the file incrementally
self.temp_file = mkstemp(suffix='.import', prefix='bulk',
dir=getattr(settings, 'BULK_INSERT_DIR', '.'))
assert self.temp_file is not None
fh, path = self.temp_file
line = u'\t'.join(["%s"] * 2)
for obj in m2m_queue['list']:
for value in obj['values']:
key = obj['key']
if key in update_map:
key = update_map[key]
out = line % (queue[key][primary_key_name], value) + u'\n'
if symmetrical:
out += line % (value, queue[key][primary_key_name]) + u'\n'
os.write(fh, out.encode('utf8'))
os.close(fh)
os.chmod(path, 0100644)
def insert_from_file(self, table, columns, autoclobber=False):
"""
executes LOAD DATA INFILE
Inserts Data from temp file created by write_temp_file
or write_m2m_temp_file
"""
qn = connection.ops.quote_name
cursor = connection.cursor()
if autoclobber is None:
autoclobber = ''
else:
autoclobber = autoclobber and 'REPLACE' or 'IGNORE'
query = "LOAD DATA INFILE '%s' %s INTO TABLE %s (%s)" % \
(self.temp_file[1], autoclobber, qn(table),
','.join([qn(c) for c in columns]))
cursor.execute(query)
os.remove(self.temp_file[1])
def insert_m2m(self, table, primary_key_name, columns, queue, m2m_queue, update_map, autoclobber, symmetrical):
qn = connection.ops.quote_name
cursor = connection.cursor()
if autoclobber is None or autoclobber == True:
autoclobber = ''
else:
autoclobber = 'IGNORE'
sql = u'INSERT %s INTO %s (%s) ' % \
(autoclobber, qn(table), ', '.join([qn(c) for c in columns]))
value_list = []
for obj in m2m_queue['list']:
for value in obj['values']:
key = update_map.get(obj['key'], obj['key'])
value_list += [queue[key][primary_key_name], value]
if symmetrical:
value_list += [value, queue[key][primary_key_name]]
arg_string = ', '.join([u'(' + ','.join(['%s']*2) + ')'] * (len(value_list)/2))
values = 'VALUES %s' % arg_string
sql = sql + values
cursor.execute(sql, value_list)
def insert(self, table, fields, queue, order, autoclobber=False):
"""
Bulk insert using INSERT
***Limited by max packet size on mysql server***
"""
qn = connection.ops.quote_name
cursor = connection.cursor()
if autoclobber is None or autoclobber == True:
autoclobber = ''
else:
autoclobber = 'IGNORE'
sql = u'INSERT %s INTO %s (%s) ' % \
(autoclobber, qn(table), ', '.join([qn(f.column) for f in fields]))
value_list = []
for key, order in order:
kwargs = queue[key]
value_list += [kwargs[f.name] for f in fields]
arg_string = ', '.join([u'(' + ','.join(['%s']*len(fields)) + ')'] * len(queue.values()))
values = 'VALUES %s' % arg_string
sql = sql + values
cursor.execute(sql, value_list)
|
More like this
- Template tag - list punctuation for a list of items by shapiromatron 9 months ago
- JSONRequestMiddleware adds a .json() method to your HttpRequests by cdcarter 9 months, 1 week ago
- Serializer factory with Django Rest Framework by julio 1 year, 4 months ago
- Image compression before saving the new model / work with JPG, PNG by Schleidens 1 year, 4 months ago
- Help text hyperlinks by sa2812 1 year, 5 months ago
Comments
Just in case nobody realized it, this snippet is the total awesome! It's bulk-insert the django way, it's just the best!
#
Hey, great snippet.
Noticed a few BackwardsIncompatibleChanges problems. Here are my fixes that seemed to work:
That should do it.
#
The
super
in__init__
is incorrect: The class passed should be this class. Passing in the parent causes the call tomodels.Model.__init__
to be skipped. It should read:Without that change you get:
The previous comment's mention of self._inherited is probably wrong, but I'm not sure if it should actually be set to True.
#
Hi
change line 340 to:
if settings.DATABASES["default"]["ENGINE"] == 'django.db.backends.mysql':
change line 942 to:
def insert(self, table, fields, queue, order, autoclobber=False):
#
Would this still work with Django>=1.5? And would it be hard to make it work for other DBMS, for example postgresql?
I think this is more than a snippet and it would perfectly fit on github where we could enhance and expand it, this is awesome!!
#
Please login first before commenting.