当前位置:主页 > 软件编程 > Python代码 >

django 链接多个数据库 并使用原生sql实现

时间:2020-10-01 15:47:15 | 栏目:Python代码 | 点击:

settings文件如下:

DATABASES = {
  'default': {
    'ENGINE': 'django.db.backends.sqlite3',
    'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
  },
  'db1': {  # 配置第二个数据库节点名称
    'ENGINE': 'django.db.backends.oracle',
    'NAME': 'devdb',
    'USER': 'hysh',
    'PASSWORD': 'hysh',
    'HOST': '192.168.191.3',
    'PORT': '1521',
  },
}

查找Django的文档:

from django.db import connection
 
def my_custom_sql(self):
  with connection.cursor() as cursor:
    cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
    cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
    row = cursor.fetchone()
 
  return row

上述方法是设置中如果有多个数据库,会默认使用 default,当你想使用指定的数据库连接时,引入的对象就变成了connections !

from django.db import connection
 
def my_custom_sql(self):
  with connection.cursor() as cursor:
    cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
    cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
    row = cursor.fetchone()
 
  return row

之后再进行操作。

补充知识:Django多数据源接入类

from rest_framework.generics import GenericAPIView
from rest_framework.response import Response
from rest_framework import status
from django.db import transaction


from .contants import db_dict

contants.py的内容

(

import cx_Oracle
import pymysql

# 定义一个数据库类型&引擎的字典,
db_dict = {'mysql':pymysql,'Oracle':cx_Oracle}

)
from .models import DataSystem,Rule


class DBconnectView(GenericAPIView):
  __DBtype = db_dict
  def get(self,request,pk,rule_id):

    # 通过传入的id进行对应的数据库链接
    self.datas = DataSystem.objects.get(pk=pk)
    self.url = self.datas.url
    self.username = self.datas.username
    self.password = self.datas.password_enc
    self.DBname = self.datas.name
    self.DBtype = self.__DBtype[self.datas.type]

    # 获取check_code规则
    self.ruledatas = Rule.objects.get(id=rule_id)
    self.check_code = self.ruledatas.check_code
    # db = __import__(self.DBtype)

    try:
      conn = self.DBtype.connect(host=self.url,user=self.username,password=self.password,database=self.DBname)
      # 链接成功后创建一个游标
      cs_ms = conn.cursor()
    except Exception as e:
      raise e
    else:
      # 明显的开启事务
      with transaction.atomic():
        # 在安全的地方,创建保存点,将来操作数据库失败回滚到此
        save_id = transaction.savepoint()

        try:

          # 获取一个元组
          db_ret = cs_ms.execute(self.check_code)
        except Exception as e:
          transaction.savepoint_rollback(save_id)
          raise e
        else:
          db_set = db_ret.fetchone()
          # transaction.savepoint_commit(save_id)
    finally:
      cs_ms.close()
      conn.close()

    return Response({'pk':pk,'rule_id':rule_id})

您可能感兴趣的文章:

相关文章