alydns.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # coding:utf-8
  2. import base64
  3. import urllib
  4. import hmac
  5. import pytz
  6. import datetime
  7. import random
  8. import string
  9. import json
  10. import sys
  11. pv = "python2"
  12. #python2
  13. if sys.version_info[0] < 3:
  14. from urllib import quote
  15. from urllib import urlencode
  16. import hashlib
  17. else:
  18. from urllib.parse import quote
  19. from urllib.parse import urlencode
  20. from urllib import request
  21. pv = "python3"
  22. ACCESS_KEY_ID = 'access_key_id'
  23. ACCESS_KEY_SECRET = 'access_key_secret'
  24. class AliDns:
  25. def __init__(self, access_key_id, access_key_secret, domain_name):
  26. self.access_key_id = access_key_id
  27. self.access_key_secret = access_key_secret
  28. self.domain_name = domain_name
  29. @staticmethod
  30. def generate_random_str(length=14):
  31. """
  32. 生成一个指定长度(默认14位)的随机数值,其中
  33. string.digits = "0123456789'
  34. """
  35. str_list = [random.choice(string.digits) for i in range(length)]
  36. random_str = ''.join(str_list)
  37. return random_str
  38. @staticmethod
  39. def percent_encode(str):
  40. res = quote(str.encode('utf-8'), '')
  41. res = res.replace('+', '%20')
  42. res = res.replace('*', '%2A')
  43. res = res.replace('%7E', '~')
  44. return res
  45. @staticmethod
  46. def utc_time():
  47. """
  48. 请求的时间戳。日期格式按照ISO8601标准表示,
  49. 并需要使用UTC时间。格式为YYYY-MM-DDThh:mm:ssZ
  50. 例如,2015-01-09T12:00:00Z(为UTC时间2015年1月9日12点0分0秒)
  51. :return:
  52. """
  53. utc_tz = pytz.timezone('UTC')
  54. time = datetime.datetime.now(tz=utc_tz).strftime('%Y-%m-%dT%H:%M:%SZ')
  55. return time
  56. @staticmethod
  57. def sign_string(url_param):
  58. percent_encode = AliDns.percent_encode
  59. sorted_url_param = sorted(url_param.items(), key=lambda x: x[0])
  60. can_string = ''
  61. for k, v in sorted_url_param:
  62. can_string += '&' + percent_encode(k) + '=' + percent_encode(v)
  63. string_to_sign = 'GET' + '&' + '%2F' + '&' + percent_encode(can_string[1:])
  64. return string_to_sign
  65. @staticmethod
  66. def access_url(url):
  67. if pv == "python2" :
  68. f = urllib.urlopen(url)
  69. result = f.read().decode('utf-8')
  70. #print(result)
  71. return json.loads(result)
  72. else :
  73. req = request.Request(url)
  74. with request.urlopen(req) as f:
  75. result = f.read().decode('utf-8')
  76. #print(result)
  77. return json.loads(result)
  78. def visit_url(self, action_param):
  79. common_param = {
  80. 'Format': 'json',
  81. 'Version': '2015-01-09',
  82. 'AccessKeyId': self.access_key_id,
  83. 'SignatureMethod': 'HMAC-SHA1',
  84. 'Timestamp': AliDns.utc_time(),
  85. 'SignatureVersion': '1.0',
  86. 'SignatureNonce': AliDns.generate_random_str(),
  87. 'DomainName': self.domain_name,
  88. }
  89. url_param = dict(common_param, **action_param)
  90. string_to_sign = AliDns.sign_string(url_param)
  91. hash_bytes = self.access_key_secret + "&"
  92. if pv == "python2":
  93. h = hmac.new(hash_bytes, string_to_sign, digestmod=hashlib.sha1)
  94. else :
  95. h = hmac.new(hash_bytes.encode('utf-8'), string_to_sign.encode('utf-8'), digestmod='SHA1')
  96. signature = base64.encodestring(h.digest()).strip()
  97. url_param.setdefault('Signature', signature)
  98. url = 'https://alidns.aliyuncs.com/?' + urlencode(url_param)
  99. #print(url)
  100. return AliDns.access_url(url)
  101. # 显示所有
  102. def describe_domain_records(self):
  103. """
  104. 最多只能查询此域名的 500条解析记录
  105. PageNumber 当前页数,起始值为1,默认为1
  106. PageSize 分页查询时设置的每页行数,最大值500,默认为20
  107. :return:
  108. """
  109. action_param = dict(
  110. Action='DescribeDomainRecords',
  111. PageNumber='1',
  112. PageSize='500',
  113. )
  114. result = self.visit_url(action_param)
  115. return result
  116. # 增加解析
  117. def add_domain_record(self, type, rr, value):
  118. action_param = dict(
  119. Action='AddDomainRecord',
  120. RR=rr,
  121. Type=type,
  122. Value=value,
  123. )
  124. result = self.visit_url(action_param)
  125. return result
  126. # 修改解析
  127. def update_domain_record(self, id, type, rr, value):
  128. action_param = dict(
  129. Action="UpdateDomainRecord",
  130. RecordId=id,
  131. RR=rr,
  132. Type=type,
  133. Value=value,
  134. )
  135. result = self.visit_url(action_param)
  136. return result
  137. # 删除解析
  138. def delete_domain_record(self, id):
  139. action_param = dict(
  140. Action="DeleteDomainRecord",
  141. RecordId=id,
  142. )
  143. result = self.visit_url(action_param)
  144. return result
  145. if __name__ == '__main__':
  146. # domain = AliDns(ACCESS_KEY_ID, ACCESS_KEY_SECRET, 'simplehttps.com')
  147. # domain.describe_domain_records()
  148. # 增加记录
  149. # domain.add_domain_record("TXT", "test", "test")
  150. # 修改解析
  151. #domain.update_domain_record('4011918010876928', 'TXT', 'test2', 'text2')
  152. # 删除解析记录
  153. # data = domain.describe_domain_records()
  154. # record_list = data["DomainRecords"]["Record"]
  155. # for item in record_list:
  156. # if 'test' in item['RR']:
  157. # domain.delete_domain_record(item['RecordId'])
  158. #print(sys.argv)
  159. file_name, certbot_domain, acme_challenge, certbot_validation = sys.argv
  160. domain = AliDns(ACCESS_KEY_ID, ACCESS_KEY_SECRET, certbot_domain)
  161. data = domain.describe_domain_records()
  162. record_list = data["DomainRecords"]["Record"]
  163. if record_list:
  164. for item in record_list:
  165. if acme_challenge == item['RR']:
  166. domain.delete_domain_record(item['RecordId'])
  167. domain.add_domain_record("TXT", acme_challenge, certbot_validation)